Optimisation of queries against event-sourced systems

I hope you’re familiar with event sourcing pattern. Switching from update-this-row to a more behavioral paradigm in designing systems, that’s one of the most influential trends for sure. But how, a system storing only business deltas/events can be queried? Can it be queried at all?

Project your views

To answer this need you can use projections. It’s nothing more than a function applied to all, or selected events (by stream id, event category, or any dimention you can get from the processed event). The similar solution one can find in Lokad CQRS which is described in this post. So yes, there is a way of applying all the needed events on a view/projections, which can be queried the given parameters. Is there a way of optimizing the queries responses?

Fast response

For sure there is! Let’s take into consideration a projection replying all events for users changing their personal data, but applying only these, which modify their user name. This is probably a very infrequent operation. The projections stores id->name map, which is used for various services to display the user friendly name. What can be done to improve the service performance storing this kind of mapping?

Consider storing two event sequence numbers:

  1. for remembering to which index the events were scanned
  2. for remembering the last user related event which actually changed the mapping

The second can be easily used to ETag all the responses. If the operation is infrequent, the responses can be 304ed easily for long periods of time. This ETag based optimization can be applied always. The more sparse projection state changes, the better chance of reusing the client cached response.

Cacheable REST API for time series

I’s been a while since last blog post and mapping back the blog domain. Let’s restart with something fency: REST API!

The problem
Lets look at any time series gathered in a eventually consistent medium (with a given time threshold). The entries/events gathered from various inputs are stored in a persistent data store. Queries for data are nothing than requests for given time slice. How one could design a REST API for accessing such a store?

The solution

Let’s start with the first request for a sample timeseries.scooletz.com link. To read it, in an ASP MVC Web API routing way would be something:

GET http://timeseries.scooletz.com/api/data

In the result I’d return a list of entries from now – t1 seconds till now – t2 seconds. t1 and t2 are selected arbitrary to match the eventual nature of the store and process of data gathering. Of course t1 > t2.

How to provide a navigation for it? Nothing simpler comes to my mind than putting LINK header which lets you provide navigation in a semantic way. The proposed value would be:

Link: <http://timeseries.scooletz.com/api/data/before/2014_09_29_23_59_10>; rel=”prev”

This would allow to get previous to the current head page. Why the second part is a multiplication of 10? That’s because of the time-chunk size. I’ve chosen to group all the entries in a 10 second chunk. This also means that the first request can contain from no data to full chunk span. There is an additional reason revealed later.

The second page, beside data placed in the body of the response would have the following value of the link header:

Link: <http://timeseries.scooletz.com/api/data/before/2014_09_29_23_59_00>; rel=”prev”,
<http://timeseries.scooletz.com/api/data/after/2014_09_29_23_59_10>; rel=”next”

The third page, and all the following ones would contain the following structure of the link header:

Link: <http://timeseries.scooletz.com/api/data/before/2014_09_29_23_58_50>; rel=”prev”,
<http://timeseries.scooletz.com/api/data/before/2014_09_29_23_59_10>; rel=”next”

As you can see, from the third page on, all the pages uses before links for navigation. There is a very good reason for it. The never expiring http cache headers set for the data in the past, which does not change.The second page could be cached, but the after link would always be responded with a non cacheable response. As the time passes, the response for the http://timeseries.scooletz.com/api/data/before/2014_09_29_23_59_10 would change their headers to

Link: <http://timeseries.scooletz.com/api/data/before/2014_09_29_23_59_00>; rel=”prev”,
<http://timeseries.scooletz.com/api/data/before/2014_09_29_23_59_20>; rel=”next”

These principles can be sum up to:

  1. cache-never the root request for data
  2. cache-forever before responses as they point to the consistent past
  3. cache-never after reponses as they point to the data being stillgathered

Which results in only non-cacheable first request, if the client moves to the past (using prev links). One can find it very helpful with a paradigm of infinite scroll. Only data for a few first entries would be fetched, all the latter would go from the browser/client cache.


You should remeber about ETags as well. Firefox, for instance, when a user hits F5, issues a request with max-age=0 trashing caches. If you add an ETag equal to the date included in the before link, you can verify it on the server side and immediately respond with 304 NotModified. The before links contains immutable data after all :)

Latency vs throughput

There are two terms which you should consider during designing your system. The more robust, the bigger system you design the deeper should be your understanding of these two values.

Throughput is nothing more than number of operations per given unit of time which can be processed by your system. For instance, in a web site case one may want to easily handle one thousand requests per second. To define needed throughput you can use estimation like

given the number of users concurrently using system set to 1000,
given the estimated number of users actions per second set to 1,
the system should have throughput equal to 1000 req/s

Is it a good estimation? I’d reconsider for sure:

  1. peak values of concurrent users. In majority of systems there are hours where your servers do nothing. On the other hand, there are hours where all of your users are logged in
  2. number of actions per second. The value 1 operation/s may be good for a person seeing a computer for the very first time. It’s much lower than standard PC user response

The obvious operation one can do to increase the throughput is batching. It’s easier to write and fsync/FlushFileBuffers after writing a batch of entries rather than syncing all the time. The same goes with network IO. Sending a bigger frame containing more messages would lead to increased throughput.

Latency is a time till request completion. You should forget about silly average value and go for median, quartile and percentile, especially 99%, 99.9% and more. Don’t be fooled by calculating average latency across whole day. Especially for systems with lots of load, these many nines will be more common than you think. To get a taste of it you should watch definitely Gil Tene discussing some common pitfalls encountered in measuring and characterizing latency.

Throughput vs latency
Having this definition, is it good enough to ask for maximized throughput? My answer is that it isn’t.

Without defined and measured latency, throughput can be bounded by the most optimal batching requests for the slowest resources.

You should satisfy other requirements as well, or at least provide meaningful statistics like MBeans of Cassandra DB or EvenStore queues lengths.

Storm processor – bolts and joins

Storm processor, recently moved to Apache foundation is a powerful stream processing library open sourced by Twitter. It provides needed resources to scale out the processing across multiple machines, providing at-least-once guarantees or exactly-once using Trident. The library is based on two basic elements:

  1. spouts – sources of tuple streams
  2. bolts – processing units, consuming and emitting different streams of tuples

which are combined in a topology, a mesh of elements emitting and consuming events in order of data processing.
Streams, unlike EventStore are not cheap and represent a logical flow of data rather than an aggregate boundary. One stream can be emitted by more than one spout or bolt. The further discussion of streams is beyond scope of this article.
The bolt declarer, used in topology builder implements plenty of interfaces allowing to define consumed tuples of different streams. What it allows one to do is assigning a given bolt instance to handle a given set of tuples from a given stream of data, for example:

  1. fieldsGrouping lets you bind tuples from the given stream, which contain declared fields to the given instance of bolt. What it means is that tuple with a given field value will be routed to the same instance of bolt class! This provides a very powerful behavior letting you group tuples by any dimension
  2. localOrShuffleGrouping provides you with a great ability of routing data in the same worker process or, if the condition cannot be satisfied, move data to another worker selected ‘randomly’. This, when no grouping needed, lets you improve performance by execution collocation and skipping network overhead.

The bolt isn’t limited to consume tuples only from one grouping. It can join multiple streams grouped in multiple ways. This can bring another opportunity for data repartitioning. For example, application emitting streams of data like exceptions on the production environment and user transactions can easily raise an alarm when a given user experience more than one exception every 10 transactions. A simple bolt using two fields groupings can deal with it easily.
I hope this short introduction will encourage you to dive into Storm. It’s a very powerful tool, especially in Complex Event Processing, with scale out possibilities.

Application deployment: multi nuget based vs custom deployment package

So far I encountered a few patterns for deploying application. Speaking of those based on nuget packages, I can easily distinguish two of them.

Package with references
that’s the first one. Frequently it’s not a custom package. It’s based on the main solution project (the application part) and is pushed to a feed with packages based on other solution projects. This leads towards design where packages are:

  1. small
  2. meant to be cross-app reusable
  3. mirrors the solution and project dependencies
  4. has references to other packages
  5. needs a nuget feed to resolve other dependencies during the deploytment time

Unfortunately, packages of this kind are not stable build artifacts. One can easily change multiple apps by pushing to the NuGet feed libraries used by installed projects. Packages once build and deployed may be changed between publications on environments which greatly diminishes the meaning of deployment package. Iff one totally controls pushing to the feeds and provides staging for feeds, this may work, otherwise – can be considered error prone (one cannot tell if the package published once, can be republished in the higher environment).

Self-sufficient package
which is the second one. This kind of package, prepared specially for deployment, consists of all items required by the given deployment, which provides packages that are:

  1. bigger
  2. targeted towards deployment
  3. orthogonal to a solution organization
  4. has no references to other packages
  5. needs no nuget feed to resolve other dependencies during the deploytment time

This kind of artifacts, used by Octopus deploy consists of snapshots of all dependencies in the given moment of build. Snapshots, by default immutable and stable, brings a self-sufficient packages, which can be simply extracted in a given environment. This for the price of declaring a custom nuspec, brings the repeatable deployment on all the environments and is a preferable way of doing deployments of mine. Even if you don’t want to use Octopus Deploy for some reasons.

Queues and threads of your organization and work in progress limits

This will be a tale of two organizations.

The first organization
was made by people leaving in asynchrony. They used emails to communicate what they wanted, they registered their needs and orders in some systems. The same people read emails from others, queried the systems and fulfilled orders. They work was oriented around consuming what they get in different queues like mentioned email boxes or ticket tracking systems. Meetings engaging bigger groups were exceptional, as they inferred with queue-orientation.

The second organization
was made by people leaving in synchrony. They used phones to communicate with others, they went through their buildings corridors to meet another person and ask whether he/she can do sth for them. Meetings were important as well. They work was oriented around synchronous engaging groups of people.

Your organization
is a mixture of this two for sure. If the first kind of organization prevails, people can easily reduce the work in progress which is one of the Kanban topics. Reduced work in progress easily increases performance by reducing the number of context switches one has to make. Additionally, queues like issues/tickets can be easily monitored and shared if needed. You may have problems with emails, but one can for instance FF them to another team member.
If the second kind of organization triumphs the work turns into running/talking/meeting. Sending an email engages one person, call or talking to someone – two, a meeting – even more which leads toward short period of time spend on real work with no interruptions.
What kind of organization is your company? How do you act and how would you like to act?

Pearls: EventStore transaction log

I thought for a while about presenting a few projects which are in my opinion real pearls. Let’s start with the EventStore and one in one of its aspects: the transaction log.
If you’re not familiar with this project, EventStore is a stream database providing complex event processing. It’s oriented around streams of events, which can be easily aggregated or repartitioned with projections. Based on ever appended streams and projections chasing the streams one can build a truly powerful logic around processing events.
One of the interesting aspects of EventStore is its storage engine. You can find a bit of description in here. ES does not abstract a storage away, the storage is a built-in part of the database itself. Let’s take a look at its parts before discussing its further:

Appending to the log
One the building blocks of ES is SEDA architecture – the communication within db is based on publishing and consuming messages, which one can notice reviewing StorageWriterService. The service subscribes to multiple messages, mentioned in implementations of the IHandle interface. The arising question is how often does the service flushed it’s messages to disk. One can notice, that method EnqueueMessage beside enqueuing incoming messages counts ones marked by interface IFlushableMessage. What is it for?
Each Handle method call Flush at its very end. Additionally, as the EnqueueMessage increases the counter of messages requiring flush, each Handle method decreases the counter when it handles a flushable message. This brings us to the conclusion that the mentioned counter is equal 0 iff there are no more flushable messages in the queue.

Flushing the log
Once the Flush is called a condition is checked whether:

  • the call was made with force=true (this never happens) or
  • there are no more flush messages in the queue or
  • the given time from the last time has passed

This provides a very powerful batching behavior. Under stress, the flush-to-be counter will be constantly greater than 0, providing flushing every given period of time. Under less stress, with no more flushables in the queue, ES will flush every message which needs to flush the log file.

Acking the client
The final part of the processing is the acknowledgement part. The client should be informed about persisting a transaction to disk. I spent a bit of time (with help of Greg Young and James Nugent) of chasing the place where the ack is generated. It does not happen in the StorageWriterService. What’s responsible for considering the message written then? Here comes the second part of the solution, the StorageChaser. In a dedicated thread, in an infinite loop, a method ChaserIteration is called. The method tries to read a next record from a chunk of unmanaged memory, that was ensured to be flushed by the StorageWriterService. Once the chaser finds CommitRecord, written when a transaction is commited, it acks the client by publishing the StorageMessage.CommitAck in ProcessCommitRecord method. The message will be translated to a client message, confirming the commit and sent back to the client.

Sum up
One cannot deny the beauty and simplicity of this solution. One component tries to flush as fast as possible, or batches a few messages if it cannot endure the pressure. Another one waits for the position to which a file is flushed to be increased. Once it changes, it reads the record (from the in-memory chunk matched with the file on disk) processes it and sends acks. Simple and powerful.