Disruptor with MultiProducer

I hope you’re aware of the LMAX tool for fast in memory processing called disruptor. If not, it’s a must-see for nowadays architects. It’s nice to see your process eating messages with speeds ~10 millions/s.
One of the problems addressed in the latest release was a fast multi producer allowing one to instantiate multiple tasks publishing data for their consumers. I must admit that the simplicity of this robust part is astonishing. How one could handle claiming and publishing a one or a few items from the buffer ring? Its easy, claim it in a standard way using CAS operation to let other threads know about the claimed value and publish it. But how publish this kind of info? Here’s come the beauty of this solution:

  1. allocate a int array of the buffer ring length
  2. when items are published calculate their positions in the ring (sequence % ring.length)
  3. set the values in the helper int array with numbers of sequences (or values got from them)

This, with overhead of int array allows:

  1. waiting for producer by simply checking the value in the int array, if it matches the current number of buffer iteration
  2. publishing in the same order items were claimed
  3. publishing with no additionals CASes

Simple, powerful and fast.
Come, take a look at it: MultiProducerSequencer

Out of order commands

In the previous posts a simple mechanism of storing information needed for operation idempotence was introduced. A simple hash table, which state is transactionally saved with the state of object onto which the send operation was applied. How about receiving operations out of order? What if infrastructure (for instance, messaging system) will pass one operation earlier than the second, which in reality occurred earlier?

It’s time to make it explicit and start calling elements in the DDD manner. So for sake of reference, the object considered as the subject of an operation is an aggregate root. The operation is of course a message. The modeling assumes using the event sourcing as a storage for aggregates’ states.

Assume, that the aggregate, which the command is sent to, has a property called Version, incremented with each event applied on. Assume then, each command contains a version number, which is supposed to be equal to the aggregate’s version. If, during dispatch, these two values are different, an exception is thrown and command do not change the state of the aggregate. It’s a simple optimistic concurrency implementation, allowing discarding out of order commands sent to an object.

To make it more interesting, consider a sharded system, where specific aggregates are stored by different nodes (but for each aggregate there is one node where it is stored). An aggregate’s events (state changes) have to be propagated across all the nodes/shards in the same idempotent manner as commands are sent to aggregates. It’s easy to apply hashtable for each node and with using the very same key: aggregateId with version but it would mean storing all the pairs of aggregate identifiers with their versions, which could possibly bring down each of your nodes (or make you use GBs of memory). Can the trivial fact, that version is increased with every event on the aggregate, could be used for some optimization? You’ll see in the next entry.

What am I missing here?

If you’re in a startup and have a full-time job a the same moment as I do, that’s a post for you.

The initial startup pressure and tempo is huge. Focused on the features you can bring to life more and more of them. How often do you load your project, collapse it’s whole structure and ask questions:

  • What am I doing now?
  • How does it influence the rest of the system?
  • Is everything I need expressible in the current infrastructure and/or design?
  • Is it something, which I know from other projects missing?

It might seem that those opened questions are unneeded, to silly to ask, but from last time I asked them, they became a weekly routine. To show you, I’ll give you an example.

I write tests. As you already know, not always unit tests, but… During one of my write test/run/fix error cycles I noticed that it was quite hard to get all the information I needed. There was an assert failing and without debugging, only by viewing logs I had no idea what might have gone wrong. I reopened the project and did ‘what am I missing here?’ After global review of the whole solution I did found a thing. During all the feature based design I did a silly mistake not providing any logging in the application. You know, these _if_log_isDebugEnabled_ stuff (take a look in the NHibernate code). It took me no more than 10 minutes to spike it with some console appender and I rerun my tests. Ha! Some components did not log one or two operations and that was it.

It’s worth not to loose the (overused phrase) big picture and from time to time, stop providing features and ask these silly, ordinary questions.