Pearls: putting EventStore in reverse

Pearls of design, beautiful patterns, efficient approaches. After covering Jil and its extremely efficient serialization of primitives it’s time to put things in reverse. By things, in this case, I mean EventStore, the event centric database that I already presented once. Now, it’s time to visit its ability to move back in time and traverse its log in the opposite direction.

Log all the things

EventStore uses a traditional log mechanism for storing its data in a transactional way. The mechanism can be described as a never-ending log file, that has new data appended at the end of itself. Of course different databases implement the “never ending file” in different ways, still, the logical approach is the same: whenever data are updated/inserted we log these operations to a file.

Appending operations has an interesting property. Whenever other components break, whenever hardware restarts, the log is the single source of truth that can be traversed and reapplied to all the components that didn’t catch up before failure. How to make sure that a specific piece of log was written properly though? That the data what we stored, are the data that survived the crush?

Make it twice

Below you can see a piece of code that is extracted from the writer:

We can see the following:

  1. there’s a buffer, which is a regular MemoryStream
  2. its position is set to 4, leaving 4 initial bytes empty
  3. the whole record is written to the buffer
  4. its length is calculated
  5. the length is written at the beginning and and at the end

This looks like storing 4 bytes too much. On the other hand, it’s a simple check mechanism, used internally by EventStore to check the consistency of the log. If these two values do not match, something is seriously wrong in chunk (an original comment from the code). Could the same length written twice be used for something more?

Put it in reverse Terry!

EventStore has additional indexing capabilities, that allow it to move back and forth pretty fast. What if we had none and still wanted to travel through the log?

Consider the following scenario. You use a log approach for you service. Then, for any reason you need to read 10th entry from the end. Having a regular log, you’d need to do the following:

  1. Read the log from the beginning counting items till it’s done. Sorry, there’s no other point and it’s painfully blunt method for doing it.

If we have the length written twice though, what you can do is to read 4 last bytes of the log, it will always be length and move backward to the previous entry. The number of bytes to move backward?

var moveBackBy = length + 2 * sizeof(int)

as two lengths are written on a single integer.

With this, you don’t need any additional index. The log itself is sufficient enough to move forward and backward.

Summary

Data structures and data format nowadays are not popular topics. As you saw above, adding just one integer added a lot of capabilities to a simple append only file. Next time, before “just using” another data store, think about your data and the format you’re gonna use. It can make a real difference.

 

Pearls: EventStore transaction log

I thought for a while about presenting a few projects which are in my opinion real pearls. Let’s start with the EventStore and one in one of its aspects: the transaction log.
If you’re not familiar with this project, EventStore is a stream database providing complex event processing. It’s oriented around streams of events, which can be easily aggregated or repartitioned with projections. Based on ever appended streams and projections chasing the streams one can build a truly powerful logic around processing events.
One of the interesting aspects of EventStore is its storage engine. You can find a bit of description in here. ES does not abstract a storage away, the storage is a built-in part of the database itself. Let’s take a look at its parts before discussing its further:

Appending to the log
One the building blocks of ES is SEDA architecture – the communication within db is based on publishing and consuming messages, which one can notice reviewing StorageWriterService. The service subscribes to multiple messages, mentioned in implementations of the IHandle interface. The arising question is how often does the service flushed it’s messages to disk. One can notice, that method EnqueueMessage beside enqueuing incoming messages counts ones marked by interface IFlushableMessage. What is it for?
Each Handle method call Flush at its very end. Additionally, as the EnqueueMessage increases the counter of messages requiring flush, each Handle method decreases the counter when it handles a flushable message. This brings us to the conclusion that the mentioned counter is equal 0 iff there are no more flushable messages in the queue.

Flushing the log
Once the Flush is called a condition is checked whether:

  • the call was made with force=true (this never happens) or
  • there are no more flush messages in the queue or
  • the given time from the last time has passed

This provides a very powerful batching behavior. Under stress, the flush-to-be counter will be constantly greater than 0, providing flushing every given period of time. Under less stress, with no more flushables in the queue, ES will flush every message which needs to flush the log file.

Acking the client
The final part of the processing is the acknowledgement part. The client should be informed about persisting a transaction to disk. I spent a bit of time (with help of Greg Young and James Nugent) of chasing the place where the ack is generated. It does not happen in the StorageWriterService. What’s responsible for considering the message written then? Here comes the second part of the solution, the StorageChaser. In a dedicated thread, in an infinite loop, a method ChaserIteration is called. The method tries to read a next record from a chunk of unmanaged memory, that was ensured to be flushed by the StorageWriterService. Once the chaser finds CommitRecord, written when a transaction is commited, it acks the client by publishing the StorageMessage.CommitAck in ProcessCommitRecord method. The message will be translated to a client message, confirming the commit and sent back to the client.

Sum up
One cannot deny the beauty and simplicity of this solution. One component tries to flush as fast as possible, or batches a few messages if it cannot endure the pressure. Another one waits for the position to which a file is flushed to be increased. Once it changes, it reads the record (from the in-memory chunk matched with the file on disk) processes it and sends acks. Simple and powerful.

I should’ve used EventStore

One of the most important features of EventStore is an ability to ask the questions like they were asked in the past. You don’t have to rerun manually all the stored information to repartition them, or to aggregate them. All you’ve got to do is to write a new projection which will run from the very beginning (almost always: take a loot at scavenging) till now.
There’s no question you should’ve asked, which cannot be added later, with no mental overhead of manually rerouting data through the pipeline once again. Nice:)