Pearls: EventStore transaction log

I thought for a while about presenting a few projects which are in my opinion real pearls. Let’s start with the EventStore and one in one of its aspects: the transaction log.
If you’re not familiar with this project, EventStore is a stream database providing complex event processing. It’s oriented around streams of events, which can be easily aggregated or repartitioned with projections. Based on ever appended streams and projections chasing the streams one can build a truly powerful logic around processing events.
One of the interesting aspects of EventStore is its storage engine. You can find a bit of description in here. ES does not abstract a storage away, the storage is a built-in part of the database itself. Let’s take a look at its parts before discussing its further:

Appending to the log
One the building blocks of ES is SEDA architecture – the communication within db is based on publishing and consuming messages, which one can notice reviewing StorageWriterService. The service subscribes to multiple messages, mentioned in implementations of the IHandle interface. The arising question is how often does the service flushed it’s messages to disk. One can notice, that method EnqueueMessage beside enqueuing incoming messages counts ones marked by interface IFlushableMessage. What is it for?
Each Handle method call Flush at its very end. Additionally, as the EnqueueMessage increases the counter of messages requiring flush, each Handle method decreases the counter when it handles a flushable message. This brings us to the conclusion that the mentioned counter is equal 0 iff there are no more flushable messages in the queue.

Flushing the log
Once the Flush is called a condition is checked whether:

  • the call was made with force=true (this never happens) or
  • there are no more flush messages in the queue or
  • the given time from the last time has passed

This provides a very powerful batching behavior. Under stress, the flush-to-be counter will be constantly greater than 0, providing flushing every given period of time. Under less stress, with no more flushables in the queue, ES will flush every message which needs to flush the log file.

Acking the client
The final part of the processing is the acknowledgement part. The client should be informed about persisting a transaction to disk. I spent a bit of time (with help of Greg Young and James Nugent) of chasing the place where the ack is generated. It does not happen in the StorageWriterService. What’s responsible for considering the message written then? Here comes the second part of the solution, the StorageChaser. In a dedicated thread, in an infinite loop, a method ChaserIteration is called. The method tries to read a next record from a chunk of unmanaged memory, that was ensured to be flushed by the StorageWriterService. Once the chaser finds CommitRecord, written when a transaction is commited, it acks the client by publishing the StorageMessage.CommitAck in ProcessCommitRecord method. The message will be translated to a client message, confirming the commit and sent back to the client.

Sum up
One cannot deny the beauty and simplicity of this solution. One component tries to flush as fast as possible, or batches a few messages if it cannot endure the pressure. Another one waits for the position to which a file is flushed to be increased. Once it changes, it reads the record (from the in-memory chunk matched with the file on disk) processes it and sends acks. Simple and powerful.

Reconcilation between systems

From time to time a system is replaced with another system being capable of doing more, or doing the thing better. It’s quite to common to ask whether no data is lost or does the system preserve needed behaviors of the old one. Sometimes it’s human-application comparison, when a procedure followed by people is replaced with an application, sometimes it’s a question of an old system vs a new system

Cassandra integrity check
Let’s presume one migrates some old-fashioned SQL system to a Cassandra based solution given following:

  1. total payload of daily data is quite high
  2. data are written to the Cassandra cluster (more than one node) with ConsistencyLevel Local Quorum
  3. there should be a possibility to check whether all the data stored in the previous systems are written to the new one

After a bit of consideration one can propose that as the data are written with LocalQuorum, they should be queried with the same level and match in the old solution. This would ensure that data which has been written are being read (famous R + W > N). This could cost a lot as querying hits [N+1]/2 nodes of your cluster, streaming a daily payload through network twice: once to the coordinator, second – to the client. Can we do this better?

Possibly faster integrity check
How about using Consistency Level of One? How can this be done to ensure that the given node consists of all the needed data? By running repair in your local data center on each node, one can ensure that each node consist of all the data it’s responsible for. Then, querying with One is ok. What’s important about nodetool repair is that it does not stream data if it’s not needed. The information sent to match if the given node contains all the data is a Merkle tree, a tree made by hash of hashes of hashes of… Sending this structure is cheap and doesn’t your network so much.
If you consider (know that) running repairs daily is a heavy task for your cluster, you’ll be happy to read about Cassandra 2.1 repair improvements, including incremental repairs.

So stop complaining about your good old fashioned RMDB and get yourself a new shiny cluster of Cassandra nodes :)

Polyglot programming

Have you ever been in a situation when you’ve got not enough tooling in your preferred environment? What did you do back then? Did you use already known tools or search for something better?
Recently I’ve been involved in a project which truly pushed me out of my comfort zone. I like .NET, but there’s not enough tooling to provide a highly scalable, performant, failure resistant environment. We moved to Java, Storm, Cassandra, Zookeeper – JVM all over the place. It wasn’t easy, but it was interesting and eye opening. Having so many libraries focused on resolving specified concerns instead of the .NET framework-oriented paradigm was very refreshing.
Was it worth it? Yes. Was it good for self-development? Yes. Will I reach for every new library/language? For sure no. The most important thing which I’ve learned so far, was that being adaptable and aware of tools is the most important thing. Mistakes were made, that for sure, but the overall solution is growing in a right direction.
After all, it’s survival of the fittest, isn’t it?

Disruptor with MultiProducer

I hope you’re aware of the LMAX tool for fast in memory processing called disruptor. If not, it’s a must-see for nowadays architects. It’s nice to see your process eating messages with speeds ~10 millions/s.
One of the problems addressed in the latest release was a fast multi producer allowing one to instantiate multiple tasks publishing data for their consumers. I must admit that the simplicity of this robust part is astonishing. How one could handle claiming and publishing a one or a few items from the buffer ring? Its easy, claim it in a standard way using CAS operation to let other threads know about the claimed value and publish it. But how publish this kind of info? Here’s come the beauty of this solution:

  1. allocate a int array of the buffer ring length
  2. when items are published calculate their positions in the ring (sequence % ring.length)
  3. set the values in the helper int array with numbers of sequences (or values got from them)

This, with overhead of int array allows:

  1. waiting for producer by simply checking the value in the int array, if it matches the current number of buffer iteration
  2. publishing in the same order items were claimed
  3. publishing with no additionals CASes

Simple, powerful and fast.
Come, take a look at it: MultiProducerSequencer

Out of order commands

In the previous posts a simple mechanism of storing information needed for operation idempotence was introduced. A simple hash table, which state is transactionally saved with the state of object onto which the send operation was applied. How about receiving operations out of order? What if infrastructure (for instance, messaging system) will pass one operation earlier than the second, which in reality occurred earlier?

It’s time to make it explicit and start calling elements in the DDD manner. So for sake of reference, the object considered as the subject of an operation is an aggregate root. The operation is of course a message. The modeling assumes using the event sourcing as a storage for aggregates’ states.

Assume, that the aggregate, which the command is sent to, has a property called Version, incremented with each event applied on. Assume then, each command contains a version number, which is supposed to be equal to the aggregate’s version. If, during dispatch, these two values are different, an exception is thrown and command do not change the state of the aggregate. It’s a simple optimistic concurrency implementation, allowing discarding out of order commands sent to an object.

To make it more interesting, consider a sharded system, where specific aggregates are stored by different nodes (but for each aggregate there is one node where it is stored). An aggregate’s events (state changes) have to be propagated across all the nodes/shards in the same idempotent manner as commands are sent to aggregates. It’s easy to apply hashtable for each node and with using the very same key: aggregateId with version but it would mean storing all the pairs of aggregate identifiers with their versions, which could possibly bring down each of your nodes (or make you use GBs of memory). Can the trivial fact, that version is increased with every event on the aggregate, could be used for some optimization? You’ll see in the next entry.

What am I missing here?

If you’re in a startup and have a full-time job a the same moment as I do, that’s a post for you.

The initial startup pressure and tempo is huge. Focused on the features you can bring to life more and more of them. How often do you load your project, collapse it’s whole structure and ask questions:

  • What am I doing now?
  • How does it influence the rest of the system?
  • Is everything I need expressible in the current infrastructure and/or design?
  • Is it something, which I know from other projects missing?

It might seem that those opened questions are unneeded, to silly to ask, but from last time I asked them, they became a weekly routine. To show you, I’ll give you an example.

I write tests. As you already know, not always unit tests, but… During one of my write test/run/fix error cycles I noticed that it was quite hard to get all the information I needed. There was an assert failing and without debugging, only by viewing logs I had no idea what might have gone wrong. I reopened the project and did ‘what am I missing here?’ After global review of the whole solution I did found a thing. During all the feature based design I did a silly mistake not providing any logging in the application. You know, these _if_log_isDebugEnabled_ stuff (take a look in the NHibernate code). It took me no more than 10 minutes to spike it with some console appender and I rerun my tests. Ha! Some components did not log one or two operations and that was it.

It’s worth not to loose the (overused phrase) big picture and from time to time, stop providing features and ask these silly, ordinary questions.

Idempotence, pt. 2

In the previous post a few operations were taken into consideration, whether there are (not) idempotent. For the sake of reference, here there are:

  • Marked as default
  • Money transfer ‘500$’ ordered to ‘x’ account
  • Label ‘leave sth for the future month’ added

If we consider ‘idempotent’ as an operation which can be applied multiple times in a row, then all the operations overriding previous values of some properties are idempotent. Having some entity marked as default 5 times does not change the fact that it is default. That’s for sure. What about provisioning ‘x’ account with 500$? Can this type of operation can be reapplied multiple times? Of course not, because it does not override any property, it changes the state, by interacting with a previous one. The same goes for ‘labeling’, of course if there is no compensation introduced (select only unique labels before saving, which would allow reapplying).

What if you want your system to be resistant to operations resend multiple times? The simplest solution is to add unique identifier for each operation and storing them is a lookup (hashtable). Each time the operation arrives, the lookup is checked whether there this operation was already processed. If so, skip it.

There is one additional condition is to have the lookup transactional with a storage you save the states. This condition is a simple ‘all-or-none’ for storing the result of operation with the fact, that this specific operation was already applied. Otherwise, if lookup would be updated in the first place and storing the state after the operation failed, there would be no change saved. The same applies to a situation, where the lookup is updated at the very end. The operation result is saved, adding info about operation to lookup fails and the next time the same operation arrives it is applied one more time. Having that said, lookup must be transactional with the medium where state is saved.