Latency vs throughput

There are two terms which you should consider during designing your system. The more robust, the bigger system you design the deeper should be your understanding of these two values.

Throughput
Throughput is nothing more than number of operations per given unit of time which can be processed by your system. For instance, in a web site case one may want to easily handle one thousand requests per second. To define needed throughput you can use estimation like

given the number of users concurrently using system set to 1000,
given the estimated number of users actions per second set to 1,
the system should have throughput equal to 1000 req/s

Is it a good estimation? I’d reconsider for sure:

  1. peak values of concurrent users. In majority of systems there are hours where your servers do nothing. On the other hand, there are hours where all of your users are logged in
  2. number of actions per second. The value 1 operation/s may be good for a person seeing a computer for the very first time. It’s much lower than standard PC user response

The obvious operation one can do to increase the throughput is batching. It’s easier to write and fsync/FlushFileBuffers after writing a batch of entries rather than syncing all the time. The same goes with network IO. Sending a bigger frame containing more messages would lead to increased throughput.

Latency
Latency is a time till request completion. You should forget about silly average value and go for median, quartile and percentile, especially 99%, 99.9% and more. Don’t be fooled by calculating average latency across whole day. Especially for systems with lots of load, these many nines will be more common than you think. To get a taste of it you should watch definitely Gil Tene discussing some common pitfalls encountered in measuring and characterizing latency.

Throughput vs latency
Having this definition, is it good enough to ask for maximized throughput? My answer is that it isn’t.

Without defined and measured latency, throughput can be bounded by the most optimal batching requests for the slowest resources.

You should satisfy other requirements as well, or at least provide meaningful statistics like MBeans of Cassandra DB or EvenStore queues lengths.

Storm processor – bolts and joins

Storm processor, recently moved to Apache foundation is a powerful stream processing library open sourced by Twitter. It provides needed resources to scale out the processing across multiple machines, providing at-least-once guarantees or exactly-once using Trident. The library is based on two basic elements:

  1. spouts – sources of tuple streams
  2. bolts – processing units, consuming and emitting different streams of tuples

which are combined in a topology, a mesh of elements emitting and consuming events in order of data processing.
Streams, unlike EventStore are not cheap and represent a logical flow of data rather than an aggregate boundary. One stream can be emitted by more than one spout or bolt. The further discussion of streams is beyond scope of this article.
The bolt declarer, used in topology builder implements plenty of interfaces allowing to define consumed tuples of different streams. What it allows one to do is assigning a given bolt instance to handle a given set of tuples from a given stream of data, for example:

  1. fieldsGrouping lets you bind tuples from the given stream, which contain declared fields to the given instance of bolt. What it means is that tuple with a given field value will be routed to the same instance of bolt class! This provides a very powerful behavior letting you group tuples by any dimension
  2. localOrShuffleGrouping provides you with a great ability of routing data in the same worker process or, if the condition cannot be satisfied, move data to another worker selected ‘randomly’. This, when no grouping needed, lets you improve performance by execution collocation and skipping network overhead.

The bolt isn’t limited to consume tuples only from one grouping. It can join multiple streams grouped in multiple ways. This can bring another opportunity for data repartitioning. For example, application emitting streams of data like exceptions on the production environment and user transactions can easily raise an alarm when a given user experience more than one exception every 10 transactions. A simple bolt using two fields groupings can deal with it easily.
I hope this short introduction will encourage you to dive into Storm. It’s a very powerful tool, especially in Complex Event Processing, with scale out possibilities.

Application deployment: multi nuget based vs custom deployment package

So far I encountered a few patterns for deploying application. Speaking of those based on nuget packages, I can easily distinguish two of them.

Package with references
that’s the first one. Frequently it’s not a custom package. It’s based on the main solution project (the application part) and is pushed to a feed with packages based on other solution projects. This leads towards design where packages are:

  1. small
  2. meant to be cross-app reusable
  3. mirrors the solution and project dependencies
  4. has references to other packages
  5. needs a nuget feed to resolve other dependencies during the deploytment time

Unfortunately, packages of this kind are not stable build artifacts. One can easily change multiple apps by pushing to the NuGet feed libraries used by installed projects. Packages once build and deployed may be changed between publications on environments which greatly diminishes the meaning of deployment package. Iff one totally controls pushing to the feeds and provides staging for feeds, this may work, otherwise – can be considered error prone (one cannot tell if the package published once, can be republished in the higher environment).

Self-sufficient package
which is the second one. This kind of package, prepared specially for deployment, consists of all items required by the given deployment, which provides packages that are:

  1. bigger
  2. targeted towards deployment
  3. orthogonal to a solution organization
  4. has no references to other packages
  5. needs no nuget feed to resolve other dependencies during the deploytment time

This kind of artifacts, used by Octopus deploy consists of snapshots of all dependencies in the given moment of build. Snapshots, by default immutable and stable, brings a self-sufficient packages, which can be simply extracted in a given environment. This for the price of declaring a custom nuspec, brings the repeatable deployment on all the environments and is a preferable way of doing deployments of mine. Even if you don’t want to use Octopus Deploy for some reasons.

Queues and threads of your organization and work in progress limits

This will be a tale of two organizations.

The first organization
was made by people leaving in asynchrony. They used emails to communicate what they wanted, they registered their needs and orders in some systems. The same people read emails from others, queried the systems and fulfilled orders. They work was oriented around consuming what they get in different queues like mentioned email boxes or ticket tracking systems. Meetings engaging bigger groups were exceptional, as they inferred with queue-orientation.

The second organization
was made by people leaving in synchrony. They used phones to communicate with others, they went through their buildings corridors to meet another person and ask whether he/she can do sth for them. Meetings were important as well. They work was oriented around synchronous engaging groups of people.

Your organization
is a mixture of this two for sure. If the first kind of organization prevails, people can easily reduce the work in progress which is one of the Kanban topics. Reduced work in progress easily increases performance by reducing the number of context switches one has to make. Additionally, queues like issues/tickets can be easily monitored and shared if needed. You may have problems with emails, but one can for instance FF them to another team member.
If the second kind of organization triumphs the work turns into running/talking/meeting. Sending an email engages one person, call or talking to someone – two, a meeting – even more which leads toward short period of time spend on real work with no interruptions.
What kind of organization is your company? How do you act and how would you like to act?

Pearls: EventStore transaction log

I thought for a while about presenting a few projects which are in my opinion real pearls. Let’s start with the EventStore and one in one of its aspects: the transaction log.
If you’re not familiar with this project, EventStore is a stream database providing complex event processing. It’s oriented around streams of events, which can be easily aggregated or repartitioned with projections. Based on ever appended streams and projections chasing the streams one can build a truly powerful logic around processing events.
One of the interesting aspects of EventStore is its storage engine. You can find a bit of description in here. ES does not abstract a storage away, the storage is a built-in part of the database itself. Let’s take a look at its parts before discussing its further:

Appending to the log
One the building blocks of ES is SEDA architecture – the communication within db is based on publishing and consuming messages, which one can notice reviewing StorageWriterService. The service subscribes to multiple messages, mentioned in implementations of the IHandle interface. The arising question is how often does the service flushed it’s messages to disk. One can notice, that method EnqueueMessage beside enqueuing incoming messages counts ones marked by interface IFlushableMessage. What is it for?
Each Handle method call Flush at its very end. Additionally, as the EnqueueMessage increases the counter of messages requiring flush, each Handle method decreases the counter when it handles a flushable message. This brings us to the conclusion that the mentioned counter is equal 0 iff there are no more flushable messages in the queue.

Flushing the log
Once the Flush is called a condition is checked whether:

  • the call was made with force=true (this never happens) or
  • there are no more flush messages in the queue or
  • the given time from the last time has passed

This provides a very powerful batching behavior. Under stress, the flush-to-be counter will be constantly greater than 0, providing flushing every given period of time. Under less stress, with no more flushables in the queue, ES will flush every message which needs to flush the log file.

Acking the client
The final part of the processing is the acknowledgement part. The client should be informed about persisting a transaction to disk. I spent a bit of time (with help of Greg Young and James Nugent) of chasing the place where the ack is generated. It does not happen in the StorageWriterService. What’s responsible for considering the message written then? Here comes the second part of the solution, the StorageChaser. In a dedicated thread, in an infinite loop, a method ChaserIteration is called. The method tries to read a next record from a chunk of unmanaged memory, that was ensured to be flushed by the StorageWriterService. Once the chaser finds CommitRecord, written when a transaction is commited, it acks the client by publishing the StorageMessage.CommitAck in ProcessCommitRecord method. The message will be translated to a client message, confirming the commit and sent back to the client.

Sum up
One cannot deny the beauty and simplicity of this solution. One component tries to flush as fast as possible, or batches a few messages if it cannot endure the pressure. Another one waits for the position to which a file is flushed to be increased. Once it changes, it reads the record (from the in-memory chunk matched with the file on disk) processes it and sends acks. Simple and powerful.

Should companies orient their IT toward service/product

The question from the subject of this entry has arisen in me after a discussion with a colleague of mine. The very initial question on this subject was whether a library with contracts should be named:

  1. after the consumer, embedding its name in the contract, marking it as ‘it’s for this app’
  2. after the provider’s functionality, marking it as ‘this service part provides this functionality’

The answer may obvious from the development perspective. The second answer provides and abstraction over given functionality hence I prefer it over 1st. What does it mean for your organization? Well, you just have defined a contract of a service. If you have a system, with up to a few abstractions like this and a team behind it, you’ve got a real service, a real product owned by the team. This way of thinking in a long term may result in:

  1. team’s ownership feeling
  2. proper contracts versioning (as it is based on)
  3. top-bottom understanding of service boundaries and responsibilities
  4. distilling a single API for given set of functionalities

The other way may be good as:

  1. other teams may influence or dictate interfaces they need
  2. versioning may be much simpler (one consumer for the contract)

Which one should choose? There’s one point I didn’t mention before, which shows the winner. It’s the entanglement factor. The first solution introduces one functionality-one API rule and makes consumers obey the rules team wants to be obeyed, for example max number of items returned per request etc. The second is very similar to sharing the most internal parts of the system, almost like… db integration. A service team have to maintain and version multiple interfaces. Let us count it, being given

  • n services
  • each service depends on 3 others

when the first solution is chosen (one service-one API) the total number of published contracts is n. In the second case, it’s 3n and the responsibility for maintenance and fixes is blury. The second version reminds me of an entangled web with no owners. Everything is nobody’s nothing.
In my opinion, the bigger shift toward service/product paradigm, with team’s ownership of the product, not the code ownership, the healthier IT teams and product they make.
How about your company? Is it oriented around services?

Reconcilation between systems

From time to time a system is replaced with another system being capable of doing more, or doing the thing better. It’s quite to common to ask whether no data is lost or does the system preserve needed behaviors of the old one. Sometimes it’s human-application comparison, when a procedure followed by people is replaced with an application, sometimes it’s a question of an old system vs a new system

Cassandra integrity check
Let’s presume one migrates some old-fashioned SQL system to a Cassandra based solution given following:

  1. total payload of daily data is quite high
  2. data are written to the Cassandra cluster (more than one node) with ConsistencyLevel Local Quorum
  3. there should be a possibility to check whether all the data stored in the previous systems are written to the new one

After a bit of consideration one can propose that as the data are written with LocalQuorum, they should be queried with the same level and match in the old solution. This would ensure that data which has been written are being read (famous R + W > N). This could cost a lot as querying hits [N+1]/2 nodes of your cluster, streaming a daily payload through network twice: once to the coordinator, second – to the client. Can we do this better?

Possibly faster integrity check
How about using Consistency Level of One? How can this be done to ensure that the given node consists of all the needed data? By running repair in your local data center on each node, one can ensure that each node consist of all the data it’s responsible for. Then, querying with One is ok. What’s important about nodetool repair is that it does not stream data if it’s not needed. The information sent to match if the given node contains all the data is a Merkle tree, a tree made by hash of hashes of hashes of… Sending this structure is cheap and doesn’t your network so much.
If you consider (know that) running repairs daily is a heavy task for your cluster, you’ll be happy to read about Cassandra 2.1 repair improvements, including incremental repairs.

So stop complaining about your good old fashioned RMDB and get yourself a new shiny cluster of Cassandra nodes :)