Pearls: EventStore transaction log

I thought for a while about presenting a few projects which are in my opinion real pearls. Let’s start with the EventStore and one in one of its aspects: the transaction log.
If you’re not familiar with this project, EventStore is a stream database providing complex event processing. It’s oriented around streams of events, which can be easily aggregated or repartitioned with projections. Based on ever appended streams and projections chasing the streams one can build a truly powerful logic around processing events.
One of the interesting aspects of EventStore is its storage engine. You can find a bit of description in here. ES does not abstract a storage away, the storage is a built-in part of the database itself. Let’s take a look at its parts before discussing its further:

Appending to the log
One the building blocks of ES is SEDA architecture – the communication within db is based on publishing and consuming messages, which one can notice reviewing StorageWriterService. The service subscribes to multiple messages, mentioned in implementations of the IHandle interface. The arising question is how often does the service flushed it’s messages to disk. One can notice, that method EnqueueMessage beside enqueuing incoming messages counts ones marked by interface IFlushableMessage. What is it for?
Each Handle method call Flush at its very end. Additionally, as the EnqueueMessage increases the counter of messages requiring flush, each Handle method decreases the counter when it handles a flushable message. This brings us to the conclusion that the mentioned counter is equal 0 iff there are no more flushable messages in the queue.

Flushing the log
Once the Flush is called a condition is checked whether:

  • the call was made with force=true (this never happens) or
  • there are no more flush messages in the queue or
  • the given time from the last time has passed

This provides a very powerful batching behavior. Under stress, the flush-to-be counter will be constantly greater than 0, providing flushing every given period of time. Under less stress, with no more flushables in the queue, ES will flush every message which needs to flush the log file.

Acking the client
The final part of the processing is the acknowledgement part. The client should be informed about persisting a transaction to disk. I spent a bit of time (with help of Greg Young and James Nugent) of chasing the place where the ack is generated. It does not happen in the StorageWriterService. What’s responsible for considering the message written then? Here comes the second part of the solution, the StorageChaser. In a dedicated thread, in an infinite loop, a method ChaserIteration is called. The method tries to read a next record from a chunk of unmanaged memory, that was ensured to be flushed by the StorageWriterService. Once the chaser finds CommitRecord, written when a transaction is commited, it acks the client by publishing the StorageMessage.CommitAck in ProcessCommitRecord method. The message will be translated to a client message, confirming the commit and sent back to the client.

Sum up
One cannot deny the beauty and simplicity of this solution. One component tries to flush as fast as possible, or batches a few messages if it cannot endure the pressure. Another one waits for the position to which a file is flushed to be increased. Once it changes, it reads the record (from the in-memory chunk matched with the file on disk) processes it and sends acks. Simple and powerful.

Out of order commands

In the previous posts a simple mechanism of storing information needed for operation idempotence was introduced. A simple hash table, which state is transactionally saved with the state of object onto which the send operation was applied. How about receiving operations out of order? What if infrastructure (for instance, messaging system) will pass one operation earlier than the second, which in reality occurred earlier?

It’s time to make it explicit and start calling elements in the DDD manner. So for sake of reference, the object considered as the subject of an operation is an aggregate root. The operation is of course a message. The modeling assumes using the event sourcing as a storage for aggregates’ states.

Assume, that the aggregate, which the command is sent to, has a property called Version, incremented with each event applied on. Assume then, each command contains a version number, which is supposed to be equal to the aggregate’s version. If, during dispatch, these two values are different, an exception is thrown and command do not change the state of the aggregate. It’s a simple optimistic concurrency implementation, allowing discarding out of order commands sent to an object.

To make it more interesting, consider a sharded system, where specific aggregates are stored by different nodes (but for each aggregate there is one node where it is stored). An aggregate’s events (state changes) have to be propagated across all the nodes/shards in the same idempotent manner as commands are sent to aggregates. It’s easy to apply hashtable for each node and with using the very same key: aggregateId with version but it would mean storing all the pairs of aggregate identifiers with their versions, which could possibly bring down each of your nodes (or make you use GBs of memory). Can the trivial fact, that version is increased with every event on the aggregate, could be used for some optimization? You’ll see in the next entry.

Idempotence, pt. 2

In the previous post a few operations were taken into consideration, whether there are (not) idempotent. For the sake of reference, here there are:

  • Marked as default
  • Money transfer ‘500$’ ordered to ‘x’ account
  • Label ‘leave sth for the future month’ added

If we consider ‘idempotent’ as an operation which can be applied multiple times in a row, then all the operations overriding previous values of some properties are idempotent. Having some entity marked as default 5 times does not change the fact that it is default. That’s for sure. What about provisioning ‘x’ account with 500$? Can this type of operation can be reapplied multiple times? Of course not, because it does not override any property, it changes the state, by interacting with a previous one. The same goes for ‘labeling’, of course if there is no compensation introduced (select only unique labels before saving, which would allow reapplying).

What if you want your system to be resistant to operations resend multiple times? The simplest solution is to add unique identifier for each operation and storing them is a lookup (hashtable). Each time the operation arrives, the lookup is checked whether there this operation was already processed. If so, skip it.

There is one additional condition is to have the lookup transactional with a storage you save the states. This condition is a simple ‘all-or-none’ for storing the result of operation with the fact, that this specific operation was already applied. Otherwise, if lookup would be updated in the first place and storing the state after the operation failed, there would be no change saved. The same applies to a situation, where the lookup is updated at the very end. The operation result is saved, adding info about operation to lookup fails and the next time the same operation arrives it is applied one more time. Having that said, lookup must be transactional with the medium where state is saved.

Composition vs derivation

Assume you’re writing some reports in your application. You’ve just created the third controller covering some kind of reporting and it seems to be, that all the three controllers have a very similar code, modulo type passed as the entity type to your NH ISession.QueryOver() or another data source. It seems that, if the method creating report base was generic, it could be used in all the cases. You want to extract it, make it clearer and to stop repeating yourself. What would you do?

Derivation
The very first thing is to extract method in each of the controllers. Now they seem almost the same. A place is needed where the method can be easily moved. How about a super type? Let’s create a controller, call it in a fashionable way, for instance: ReportControllerBase, and move the method in there. Now you can easily remove the methods in the deriving controllers. Yeah! It’s reusable, everyone writing his/her report can derive from the ReportControllerBase and use its methods to speed up his/her task.

Composition
The very first step is exactly the same: the extract method must be done, to see the common code. Once it’s done, you notice that the whole method has only a few dependencies which can be easily pushed to parameters, for instance: isession, the entity type passed to query, the value used in some complex where clause, etc. You change all the field and properties usages to parameters which allows this method to be static. You create a static method and turn it into an extension method of a session. The refactorization is done, you can easily call this method in all the controllers, by simply calling an extension onto a session.

What’s the difference and why composition should be preferred
If you use C# or Java you should always be aware of one limitation: you can derive from only one type. Spending this ‘once in a lifetime’ for a simple functionality extraction, for me – that’s the wrong way. Using composition, and deriving only when you truly see that one type is another type, that’s the right way to go. In the next post I’ll write about ASP MVC Detached Actions, a simple mechanism you can use, to derive your controllers only, when it is needed and delegating common actions, without it.

Simplicity of your architecture

A few days ago I watched a nice presentation published by Infoq called Simplicity Architect which made me think one more about design decisions I’ve been making through the last one year. Dan North, which was the speaker remind of the major problem every architect, I’d say, designer has. The problem is complexity lurking in the dark corners of each solution. I can remember times when from-zero-to-working-example took my team more than one month because of the db creation, design, and other stuff which you can qualify as things, which your business don’t understand. During the very last startup, it took me only 2 hours to provide the very basic set of functionalities with viewing and filtering a list of entities, editing them and so on. I did use a few tools (NHibernate, log4net, Unity and my infrastructure part injecting it all in MVC in my way), but it was only 2 hours to have a working example and it was _simple_ in terms of design. I did like it as the business owner did.

Hope you don’t get too complex either.

Project structure

I’ve been working on a few quite big projects recently. The first I was dropped into was designed a few months before my arrival and had the solution consisting of almost 60 projects. The references were cascading, with each project having all the previous referenced. The web project depended on all of the projects, and had nothing in common with terms like dependency injection, ORM, best practices. All the data access code was written with using SqlCommands with caching thrown in multiple places. The same with other blocks you can imagine such as validation, logging, gathering statistics, etc. Reviewing the code made me think about NIH syndrome.

The second project was designed to handle a lot of stuff, for instance lifetime management (a very special case) and  very domain specific workflows. The design was done in 80% when I entered the project. We tried to write not much code, using already provided frameworks for different purposes like: NHibernate, log4net, NServiceBus, Irony parser (for domain specific language). Then the first version was established, the solution had 16 projects. Two weeks ago I made a major refactorization, reducing a few service and code-based (yet another abstraction) boundaries, creating a more integrated, yet _not_more_ coupled system. The final solution was performing better and had much simpler deployment. It consisted of 11 projects.

The very last solution consisted of 6 projects:

  • Web (having reference to NHibernate and Model)
  • Web.Config (infrastructure stuff, all the DI registrations, referencing all libraries and setting up controller factory in ControllerBuilder and default ModelBinder, infrastructure service implementations)
  • Model (entities, factories, services, types – like email, domain events and their handlers)
  • Model.Impl (implementation of interfaces from Model split in the Model’s manner)
  • Model.Persistence (all the NH event listeners, Fluent NH mappings, Solr services)
  • Model.Presentation (simple denormalized readonly objects to be used for queries result, possibly with Solr)

Do my solutions become to minimal? As far as the last solution is being built it does not seem so. The responsibilities are clean and all can be easily put into this separation. How do you find it? Any ideas?

Domain events with Unity Container extension

I do like domain events perfectly described by Udi. I’d like to share my implementation of this pattern, using Unity (my preferable container), which seems to be simple, short and still powerful. I assume, that you read the Udi’s article carefully. First and foremost: the domain interfaces

    /// <summary>
    /// The markup interface for any system event.
    /// </summary>
    public interface IEvent
    {
    }

    /// <summary>
    /// The interface of a handler for events of type <typeparamref name="T"/>.
    /// </summary>
    /// <typeparam name="T">The type of the event to handle.</typeparam>
    public interface IEventHandler
        where T : IEvent
    {
        /// <summary>
        /// Handles the specified @event.
        /// </summary>
        /// <param name="event">The @event.</param>
        void Handle(T @event);
    }

    /// <summary>
    /// The interface of event manager, allowing raising events based on the <see cref="IEvent"/>.
    /// </summary>
    public interface IEventManager
    {
        /// <summary>
        /// Raises the specified event, not requiring, 
        /// that it is handled by any handler.
        /// </summary>
        /// <typeparam name="T">The type of the event.</typeparam>
        /// <param name="event">The @event.</param>
        /// <remarks>
        /// The method iterates through all of the registered handlers for the event of type <typeparamref name="T"/>.
        /// </remarks>
        void Raise(T @event)
            where T : IEvent;
    }

I assume usage of dependency injection and having the IEventManager implementation injected. Speaking about the IEventManager implementation it’s fairly simple, located in a project knowing the Unity assembly (Infrastructure in my case). It’s worth to mention, that I consider handlers to be transient objects, constructed and discarded immediately after event handling.

    /// <summary>
    /// The implementation of event manager using the container to resolve handlers.
    /// </summary>
    public class EventManager : IEventManager
    {
        private readonly IUnityContainer _container;

        [DebuggerStepThrough]
        public EventManager(IUnityContainer container)
        {
            _container = container;
        }

        [DebuggerStepThrough]
        public void Raise(T @event)
            where T : IEvent
        {
            var handlers = _container.&lt;ResolveAll&gt;();

            foreach (var handler in handlers)
            {
                try
                {
                    handler.Handle(@event);
                }
                finally
                {
                    _container.Teardown(handler);
                }
            }
        }
    }

The last but not least: how to register all the handlers in the container? I wrote a small unity extension providing assembly scan functionality which can be found below. The are two mouthful method names in the code:

  • GetFirstClosedGenericInterfaceBasedOnOpenGenericInterface, which tries to do what is written it does :P
  • RegisterInstanceWithSingletonLifetimeManager,an extension methods using a I-have-nothing-to-do-with-monitors LifetimeManager for setting up the singletons

The code itself:

    /// <summary>
    /// The <see cref="UnityContainer"/> extension registering all the event handlers
    /// and unity-based implementation of <see cref="IEventManager"/>.
    /// </summary>
    /// <remarks>
    /// The event handlers are regsitered as classes with transient lifetime. 
    /// Their instances are tear down immediately after usage.
    /// </remarks>
    public class EventUnityContainerExtension : UnityContainerExtension
    {
        protected override void Initialize()
        {
            var eventManager = new EventManager(Container);
            Container.RegisterInstanceWithSingletonLifetimeManager(eventManager);
        }
        /// <summary>
        /// Registers all the event handlers from assembly of the passed type <typeparamref name="T"/>.
        /// </summary>
        /// <typeparam name="T">The type which assembly should be scanned.</typeparam>
        /// <returns>This instance.</returns>
        public EventUnityContainerExtension RegisterHandlersFromAssemblyOf<T>()
        {
            return RegisterHandlersFromAssembly(typeof(T).Assembly);
        }

        /// <summary>
        /// Registers all the event handlers from all <paramref name="eventHandlersAssemblies"/>.
        /// </summary>
        /// <param name="eventHandlersAssemblies">List of assemblies to scan.</param>
        /// <returns>This instance.</returns>
        public EventUnityContainerExtension RegisterHandlersFromAssemblies(params Assembly[] eventHandlersAssemblies)
        {
            foreach (var eventHandlersAssembly in eventHandlersAssemblies)
            {
                RegisterHandlersFromAssembly(eventHandlersAssembly);
            }

            return this;
        }

        private EventUnityContainerExtension RegisterHandlersFromAssembly(Assembly assembly)
        {
            foreach (var type in assembly.GetTypes())
            {
                if (type.IsInterface || type.IsAbstract)
                {
                    continue;
                }

                var handlerInterface = type.GetFirstClosedGenericInterfaceBasedOnOpenGenericInterface(typeof(IEventHandler));
                if (handlerInterface == null)
                {
                    continue;
                }

                // the type is registered with a name, because only named registrations can be resolved by unity.ResolveAll method
                Container.RegisterType(handlerInterface, type, type.FullName, new TransientLifetimeManager());
            }

            return this;
        }
    }
}

Simple and powerful, isn’t it? :)