Sunday, March 24, 2013

Reading large files in chunks with proper encapsulation

I've been doing some work lately which involves sequentially reading large files (> 2 to 5GB). This entails that it's not an option to read the whole structure in memory; it's more reliable to process the file in chunks. I occasionally come across legacy that solves exactly this problem, but in a procedural way, resulting in tangled spaghetti. To be honest, the first piece of software I ever wrote in a professional setting also went at it in the wrong way.

There is no reason to let it come to this though; you can use the often overlooked yield return keyword to improve encapsulation.
When you use the yield keyword in a statement, you indicate that the method, operator, or get accessor in which it appears is an iterator. You consume an iterator method by using a foreach statement or LINQ query. Each iteration of the foreach loop calls the iterator method. When a yield return statement is reached in the iterator method, expression is returned, and the current location in code is retained. Execution is restarted from that location the next time that the iterator function is called.
Have a look at the following Reader class which takes advantage of yield returning. This class reads from file, line by line, building a chunk, to return it when the desired chunk size is attained. In the next iteration, the call will continue by clearing the lines - thereby releasing memory, and rebuilding the next chunk.
public class Reader
{
    private int _chunkSize;

    public Reader(int chunkSize) 
    {
        _chunkSize = chunkSize;
    }

    public IEnumerable<Chunk> Read(string path)
    {
        if (string.IsNullOrEmpty(path))
            throw new NullReferenceException("path");

        var lines = new List<string>();

        using (var reader = new StreamReader(path))
        {
            string line;
            while ((line = reader.ReadLine()) != null)
            {
                lines.Add(line);

                if (lines.Count == _chunkSize)
                {
                    yield return new Chunk(lines);

                    lines.Clear();
                }
            }                
        }

        yield return new Chunk(lines);
    }
}

public class Chunk
{
    public Chunk(List<string> lines) 
    {
        Lines = lines;
    }

    public List<string> Lines { get; private set; }
}
And that's one way to achieve clean encapsulation without starving your machine's memory.
var reader = new Reader(chunkSize: 1000);
var chunks = reader.Read(@"C:\big_file.txt");

foreach (var chunk in chunks)            
    Console.WriteLine(chunk.Lines.Count);

Sunday, March 17, 2013

Putting my IronMQ experiment under stress

Two weeks ago, I shared my first impressions of IronMQ. Last week, I looked at some infrastructure to facilitate pulling from IronMQ. This implementation worked, but I hadn't put it under stress yet; "First make it work, then make it fast", and all of that.

I arranged a simple scenario for testing: one message type - thus one queue, where there are eight queue consumers that simultaneously pull messages from that queue, and dispatch them to a handler which sleeps for one second.
public class MessageSleepForOneSecond { }

public class MessageSleepForOneSecondHandler : IMessageHandler<MessageSleepForOneSecond>
{
    public void Handle(MessageSleepForOneSecond message)
    {
        Thread.Sleep(1000);
    }
} 
To establish a baseline, I foolishly set the polling interval to only 100ms, and pulled 2000 messages from the queue one at a time. With this configuration I processed all 2000 messages in 2 minutes and 20 seconds, with an average throughput of 14.3 messages per second. In theory you would expect the throughput to be higher though.
The constraint in this story is the CLR's thread pool. Every time a queue consumer's internal timer ticks, the callback which pulls from the queue and invokes the messagehandler, takes up a new thread on the thread pool. The thread pool makes a few threads available when you start your application, but once they're all in use, it will have to start new ones, which is rather expensive. More importantly though, when you're queuing too many tasks on the thread pool, and the number of active threads is higher than the number of processors, it will slow down, and wait 500ms to see if it can reuse the existing threads, before creating a new one. When the maximum number of threads is reached, the thread pool will still enlist your tasks in its queue, but only start processing them once threads become available again. In short, the thread pool has a few tricks up its sleeve to protect you from saturating your resources. Remember that too much parallelization and its corresponding context switches won't do you any good.

Having established a baseline, and having learned a bit more on how the thread pool behaves, I tried one of the first optimizations I already had in mind last week; pulling batches instead of single messages. This reduces the number of necessary HTTP requests, and the number of threads needed to do work on. To support this, I extended the queue consumer configuration with a new property, and changed the queue consumer to take the batch size into account.
public interface IQueueConsumerConfiguration<T>
{
    int PollingInterval { get; }

    int BatchSize { get; }
}

try
{
    var messages = (IEnumerable<Message>)null;

    if (!_queue.TryGet(out messages, _queueConsumerConfiguration.BatchSize))
        return;

    foreach (var message in messages)
    {
        try
        {
            var messageBody = (T)JsonConvert.DeserializeObject(message.Body, typeof(T));

            _messageDispatcher.Dispatch<T>(messageBody);

            _queue.Delete(message.Id);
        }
        catch (Exception ex)
        {
            _errorHandler.Handle(ex, message);
        }
    }
}
catch (Exception ex)
{
    _errorHandler.Handle(ex, null);
}                
On repeating the test with 2000 messages, the same polling interval of 100ms, but with a batch size of 30, the messages were now all processed in one minute and fifteen seconds, resulting in a throughput of 26 messages per second. That's almost an improvement of 100%.

This throughput isn't sustainable though if we had a lot more messages to process. We're starting a new thread every 100ms or 500ms, while the work we are doing on it only finishes after a rough 30 seconds (it's not only invoking the handlers, but the HTTP requests also take time). We're burning through threads quicker than we're releasing them. If we would run out of threads on the thread pool, it would just stop starting new ones, and queue the tasks until other threads are done doing work.

In my previous post I also considered a smart polling algorithm, but I haven't looked at that yet, what's in place is more than good enough for me at the moment.

Be sure to take these numbers with a grain of salt. I would have to test my infrastructure with millions of messages on the queue instead of just 2000 to get trustworthy results. I feel I can predict fairly well how the system will behave when put under load for a longer amount of time though; it would grind to a halt. As mentioned before, we would run out of threads to do work on. I simulated this by lowering the thread pool's maximum number of threads. Other parameters that influence the numbers in this test are: size of the messages, version of the runtime, the operating system, the amount of processors, latency of the network... I ran these tests with empty messages, .NET 4 installed on my own Windows 7 box with an Intel i7 on board.

It comes down to cherry picking a configuration per queue consumer that will be sustainable based on the amount of messages you expect, the desired throughput, and the time it takes to process a single message.

Find the source on Github.

Sunday, March 10, 2013

Some experimental infrastructure for IronMQ pull

I wrote about using IronMQ as a cloud-based message queue last week. In that post I explained that you can go at using IronMQ in two ways; either you pull from the queue yourself, or you let IronMQ push messages from the queue to your HTTP endpoints. At first sight, the latter allows you to outsource more infrastructure to their side, but upon closer inspection it  also introduces other concerns: security, local debugging and scalability.

Out-of-the-box there is no infrastructure in the client libraries to facilitate periodic pull - polling, that's why I took a stab at doing it myself. It's still rough, not production tested, and hasn't considered a bunch of niche scenarios, but it should give you an idea of the direction it's going.

A high-level overview of the components looks like this.


A queue host hosts multiple queue consumers. Each queue consumer will poll a queue for one type of message on a configurable interval, and dispatch dequeued messages to relevant message handlers. After handling the message, the message will be deleted from the queue. If something happened to go wrong, the error handler for this type of message will be invoked, and the message will automatically return to the queue.

I'm going to look at each component, starting with the smallest, and slowly assemble them into bigger components.

Each queue consumer can be configured independently. For now, only the polling interval can be changed. By default it's one second, or 1000 milliseconds.
public interface IQueueConsumerConfiguration<T>
{
    int PollingInterval { get; }
}

public class QueueConsumerConfiguration<T> : IQueueConsumerConfiguration<T>
{     
    public int PollingInterval
    {
        get { return 1000; }
    }       
}
A queue can push messages, get raw messages, and delete them. The implementation makes use of the OSS IronIO client libraries.
public interface IQueue<T>
{
    bool TryGet(out Message message);

    void Delete(string messageId);

    void Push(T message);
}

public class Queue<T> : IQueue<T>
{
    private readonly IronIO.IronMQ _queue;

    public Queue(string projectId, string token)
    {
        Guard.ForEmptyString(projectId, "projectId");
        Guard.ForEmptyString(token, "token");

        var queueName = typeof(T).Name; 

        _queue = new IronMQ(queueName, projectId, token);
    }

    public bool TryGet(out Message message) 
    {
        message = _queue.Get();            

        return message != null;
    }

    public void Delete(string messageId)
    {
        Guard.ForNull(messageId, "messageId");

        _queue.Delete(messageId);
    }
    
    public void Push(T message)
    {
        Guard.ForNull(message, "message");

        _queue.Push(JsonConvert.SerializeObject(message));
    }
}
A message dispatcher dispatches messages to the relevant handlers.
public interface IMessageDispatcher
{
    void Dispatch<T>(T message);
}

public class MessageDispatcher : IMessageDispatcher
{
    private readonly IKernel _kernel;

    public MessageDispatcher(IKernel kernel)
    {
        _kernel = kernel;            
    }

    public void Dispatch<T>(T message)
    {
        var handlers = _kernel.GetAll<IMessageHandler<T>>();
      
        foreach (var handler in handlers)
            handler.Handle(message);                        
    }
}
If something goes wrong pulling the message from the queue or handling it, the error handler will be invoked passing in the exception and the raw message. Since it's possible that something is wrong with the message in itself, I pass in the raw message with the serialized message and all its meta data like id, delay and expiration date.
public interface IErrorHandler<T>
{
    void Handle(Exception exception, Message message);
}

public class ErrorHandler<T> : IErrorHandler<T>
{       
    public void Handle(Exception exception, Message message)
    {
        throw exception;
    }     
}
Putting all these components to work together, we end up with a queue consumer. When a queue consumer is started, it instantiates a timer which will try to get a raw message from the queue on each tick. If there's a raw message, it will extract the body, deserialize it into the message, dispatch it, and finally delete the raw message from the queue. If something goes wrong here, the error handler will be invoked, and the message will automatically return back to the queue.
public interface IQueueConsumer<T> : IQueueConsumer where T : class
{
}

public interface IQueueConsumer : IDisposable
{
    void Start();
}

public class QueueConsumer<T> : IQueueConsumer<T> where T : class
{
    private readonly IQueue<T> _queue;
    private readonly IErrorHandler<T> _errorHandler;
    private readonly IQueueConsumerConfiguration<T> _queueConsumerConfiguration;
    private readonly IMessageDispatcher _messageDispatcher;
    private Timer _timer;        

    public QueueConsumer(
        IQueue<T> queue, 
        IErrorHandler<T> errorHandler,
        IQueueConsumerConfiguration<T> queueConsumerConfiguration,
        IMessageDispatcher messageDispatcher)
    {
        _queue = queue;
        _errorHandler = errorHandler;
        _queueConsumerConfiguration = queueConsumerConfiguration;
        _messageDispatcher = messageDispatcher;
    }    

    public void Start()
    {
        _timer = new Timer((x) =>
        {
            var message = (Message)null;
            var messageBody = (T)null;

            try
            {
                if (!_queue.TryGet(out message))
                    return;

                messageBody = (T)JsonConvert.DeserializeObject(
                    message.Body, typeof(T));

                _messageDispatcher.Dispatch<T>(messageBody);

                _queue.Delete(message.Id);
            }
            catch (Exception ex)
            {
                _errorHandler.Handle(ex, message);
            }                
        }, null, 0, _queueConsumerConfiguration.PollingInterval);            
    }

    public void Dispose()
    {
        if (_timer == null)
            return;

        _timer.Dispose();
    }
}
Since we have multiple queues to pull from, we can use a queue host to control multiple queue consumers at once. The queue host configuration decides which queue consumer to instantiate and start.
public class QueueHostConfiguration
{
    public QueueHostConfiguration(IEnumerable<Type> messageTypes)
    {
        Guard.ForNull(messageTypes, "messageTypes");

        MessageTypes = messageTypes;
    }

    public IEnumerable<Type> MessageTypes { get; private set; }
}

public class QueueHost : IDisposable
{
    private readonly IKernel _kernel;
    private readonly QueueHostConfiguration _configuration;

    private readonly List<IQueueConsumer> _consumers;

    public QueueHost(IKernel kernel, QueueHostConfiguration configuration)
    {
        _kernel = kernel;
        _configuration = configuration;

        _consumers = new List<IQueueConsumer>();
    }

    public void Start()
    {
        foreach (var messageType in _configuration.MessageTypes)
        {
            var queueConsumerType = typeof(IQueueConsumer<>).MakeGenericType(messageType);
            var queueConsumer = (IQueueConsumer)_kernel.Get(queueConsumerType);

            _consumers.Add(queueConsumer);

            queueConsumer.Start();
        }
    }

    public void Dispose()
    {
        if (_consumers == null)
            return;

        foreach (var consumer in _consumers)
            consumer.Dispose();
    }
}
In your application, you'll end up doing something like this to start the queue host. 
using (var host = new QueueHost(kernel, new QueueHostConfiguration(
    new[] { typeof(MyMessage) })))
{
    host.Start();

    Console.ReadLine();
}
All the components are glued together using Ninject and some conventions. 
public class Bootstrapper
{
    public void Run(IKernel kernel)
    {
        kernel.Bind(x => x
          .FromThisAssembly()
          .SelectAllClasses()
          .InheritedFrom(typeof(IMessageHandler<>))
          .BindAllInterfaces());

        kernel.Bind<Infrastructure.IMessageDispatcher>()
            .To<Infrastructure.MessageDispatcher>()
            .InTransientScope();
        kernel.Bind(x => x
            .FromAssemblyContaining(typeof(IQueue<>))
            .SelectAllClasses()
            .InheritedFrom(typeof(IQueue<>))
            .BindAllInterfaces()
            .Configure(y =>
            {
                y.WithConstructorArgument("projectId", z => { return "your_project_id"; });
                y.WithConstructorArgument("token", z => { return "your_token"; });
            }));
        kernel
            .Bind(typeof(Infrastructure.IQueueConsumer<>))
            .To(typeof(Infrastructure.QueueConsumer<>));
        kernel
            .Bind(typeof(Infrastructure.IQueueConsumerConfiguration<>))
            .To(typeof(Infrastructure.QueueConsumerConfiguration<>));
        kernel
            .Bind(typeof(Infrastructure.IErrorHandler<>))
            .To(typeof(MyErrorHandler<>));    
    }
}

This is what I got for now. Next step is to do some more serious integration testing, and see what gives. There are two things I already kind of expect to run into; the maximum number of concurrent connections, and thread starvation (each timer tick starts a new thread). Anything else I'm going to run into?

The biggest disadvantage of opting for pull that is already obvious now, is the possible number of wasted HTTP requests. You could increase the polling interval, and thereby lower the number of requests, but this would harm the throughput of message bursts. Something I'm considering right now, is introducing a smart polling algorithm. Another option that will lower the number of requests, is to pull batches instead of single messages from the queue. Implementing this one will be rather straightforward, yet improve things considerably.

Sunday, March 3, 2013

First IronMQ impressions

First time I touched messaging was in the first few years of my professional life working on software that supported fire departments in their day-to-day activities. The dispatching software would send messages to a proprietary broker, which in its turn would forward them to interested subscribers; other dispatching clients, or services. To ensure availability, the broker component could failover to a different machine, but that was about it. It didn't allow you to queue or retry messages; if you weren't up when the messages were forwarded, you would never receive them. When the brokers were both down, all messages would be lost; the clients didn't have infrastructure out-of-the-box that could queue the messages locally until it came back up again. When things went haywire, and they occasionally did, these missing features would often leave us with an inconsistent state. More modern messaging software has solved these concerns though.

Although quite a few systems would benefit from asynchronous and loosely coupled messaging - especially to improve reliability and (perceived) performance, but also scalability, I still too seldom see or hear about projects that get to go that extra mile. Solutions often end up compromising in quality to avoid introducing that extra component and those unconventional questions. And this decision might be perfectly sound, because lots of factors are at play, not just technical ones. It's still a pity when you see solutions struggle to solve a problem in a decent way because they're stuck with synchronous communication.

Imagine a public website that's in the business of booking hotels. The offers they show to their customers are all based on data provided by third parties. Because it's so expensive to fetch this data, it's being cached, and as a result, it's stale seconds after fetching it. The moment a user confirms, they could fetch fresh data from the relevant third party to make sure the room is still available, but this process is error prone: the third party might be down, fetching the data is really slow, one of our their own components might be down, on conflicts they might want to compile a list of some decent alternatives, which in its turn might also be too slow if done on demand. One alternative could be to queue the booking, and process it in the back-end. Once they're done processing it, they can mail the user a confirmation or an apology with a list of alternatives attached. By making this process asynchronous, they avoid the risk of a slow user experience and clumsy failures they can't recover from, making them lose business in the long run. But then again, they also take the burden of extra infrastructure, new operational concerns and different questions. Trade-offs.

Anyways, in this blog post, I wanted to share some first impressions on IronMQ. I admittedly kind of accidentally discovered this service browsing through AppHarbor's add-ons. Here it is described as "a scalable cloud-based message queue".

Basically IronMQ gives you a REST enabled queue in the cloud. After authenticating, you can POST a new message to the queue. If you are unable to POST the message, you'll lose it, since there is no support out-of-the-box for retrying or persisting the message on the client. Once the message is posted, another process can GET the message, and DELETE it after successfully processing it. If something happened to go wrong while processing the message, the message will return to the queue, and be retried one minute later. If the message processing keeps failing on subsequent retries, the retries won't repeat infinitely though; messages expire (the default is 7 days, and the maximum is 30 days). This is exactly the kind of infrastructure we need to support the asynchronous booking scenario: have the customer put its booking on the queue, and one of our background processes will try to process it; if something goes wrong, we'll just keep retrying for a while.

The REST API is simple, yet there are client libraries available for most popular languages. They don't provide that much extra functionality though. Here's the gist.
var queue = new IronMQ(queueName, projectId, token);
queue.Push("{ hello: world }");
var message = queue.Get();
queue.Delete(message.Id);
In this scenario you're responsible for pulling data from the queue. This is just one way to go at things though; another option is to let IronMQ push messages to your HTTP endpoints. While this allows you to outsource some infrastructure to their side, it raises other concerns:
  • Security: you might need to enable HTTPS, and provide an authentication mechanism.
  • Debugging: if you want to do some end-to-end integration testing on your local machine, you'll need to give your machine a public IP and set up something like dyndns
  • Scalability: depending on the expected message volume, and the web stack you're rolling with, it might be more expensive to have to set up all these web servers, instead of a few background workers.
Errors are handled quite elegantly; once you processed the message successfully, make your endpoint return HTTP status code 200 or 202, and the message will be removed from the queue. HTTP status code 202 is used for long running processes. If the response code is in the 400 or 500 range, the message will return to the queue to be retried later.

When the expected volume of messages is rather small, it makes more sense to opt for push; you don't waste that many HTTP requests.

IronMQ makes it extremely simple to get started; go to their site, get a project id and a token, and start making HTTP calls. Do them yourself, or use one of the client API's. But it also seems to be all you're going to get; there is no infrastructure that addresses operational concerns, error queues, retry strategies, local queues,... IronMQ provides you with raw queueing infrastructure, not a framework.

Their site does give you a look into your queues though; you can't look at the messages, but you do get a nice overview.


I don't mind that it's not a turnkey solution though; I learn a lot from tinkering with this stuff. Solving problems and considering trade-offs for yourself is priceless.

Using HTTP for messaging still feels a bit quirky to me. As long as I can remember people have been making me believe HTTP is not the best fit for high-throughput messaging scenarios, and I do understand their motivations somewhat. But when even databases start to embrace HTTP, it's probably time to shake off the doctrine. It's so comfy to not have to understand a new protocol, and HTTP just seems such a sensible thing to do when you're off-premise.

I'm going to conduct some more experiments this week, I'll see what gives.