Process manager in event sourcing

There is a pattern which can be used to orchestrate collaboration of different aggregates, even if they are located in different contexts/domains. This patters is called a process manager. What it does is handling events which may result in actions on different aggregates. How can one create a process manager handling events from different sources? How to design storage for a process manager?

In the latest take of event sourcing I used a very same direction taken by EventStore. My first condition was to have a single natural number describing the sequence number for each event committed in the given context/domain (represented as module). This, because of using an auto-incrementing identity in a relational database table, even when some event may be rolled back by transaction, has resulted in an monotonically increasing position number for any event appended in the given context/domain. This lets you to use the number of a last read event as a cursor for reading the events forward. Now you can imagine, that having multiple services results in having multiple logs with the property of monotonically increasing positions, for example:

  • Orders: e1, e2, e3, e6
  • Notifications: e1, e2, e3, e4

If a process manager reads from multiple contexts/domains, you can easily come to a conclusion that all you need to store is a last value of a cursor from the given domain. Once an event is dispatched, in terms of finishing handling by the process manager, the cursor value for the context event was created within is updated. This creates an easy but powerful tool for creating process managers with at-least-once process guarantee for all the events they have to process.

If a process provides guarantee of processing events at-least-once and can perform actions on aggregates, it may, as action on aggregate and saving the state of a PM isn’t transactional, perform the given action more than once. It’s easy: just imagine crushing the machine after the action but before marking the event as dispatched. How can we ensure that no event will result in a doubled action? This will be the topic of the next post.

Business Driven Development

If you’re into software development you’ve probably heard about Behavior-driven development. Recently I had a discussion whether or not business people think in this way. Fortunately, I was involved in a business workshop, so I could make some observations.
The way mentioned earlier is the only language business uses to define and discuss aspects of their actions. They are some abbreviations like:
Once we reach 1000 participants, we assign them rooms
Which can be easily translated into

  • Given 999 participants registered
  • When a participant registered
  • Then the rooms are allocated

This can be easily read by business as by developers.
If you can model your solutions towards this kind of testing, which not necessarily must be performed with tools for BDD but can be easily done by introducing Event Sourcing and structuring your tests like in Lokad CQRS examples then you can finally start to discuss business ideas with business instead of describing how your db is updated. And this, for sure makes the difference.

CRUD chat

A: Hello, have you CREATEd a new car?
B: No! I just UPDATEd its Owner field, setting it to my id. I needed to UPDATE the balance field of my Account row as well.
A: Oh, I see. Yesterday Tom DELETEd a few employees. They were stealing money. Unfortunately there is a transition period, so first he needed to UPDATE their IsActive to false, then after the period he could finally DELETE them.
B. Yes, that’s the way you do it.

No it’s not. People do not use only four verbs to describe their activities, and if they do, they have a real problem. The scope of vocabulary used by business as well as other people is much wider and their is a reason behind it. You can name everything a THING, you can use only four CRUD verbs to describe activities but instead of meaningful phrases you get a long sentences filled with clarifications. Using a vocabulary consisting of a few words only will not only increase the number of words to describe something but for sure will for sure loose some of the meaning. Can you afford? Can your company afford it as well?

Optimisation of queries against event-sourced systems

I hope you’re familiar with event sourcing pattern. Switching from update-this-row to a more behavioral paradigm in designing systems, that’s one of the most influential trends for sure. But how, a system storing only business deltas/events can be queried? Can it be queried at all?

Project your views

To answer this need you can use projections. It’s nothing more than a function applied to all, or selected events (by stream id, event category, or any dimention you can get from the processed event). The similar solution one can find in Lokad CQRS which is described in this post. So yes, there is a way of applying all the needed events on a view/projections, which can be queried the given parameters. Is there a way of optimizing the queries responses?

Fast response

For sure there is! Let’s take into consideration a projection replying all events for users changing their personal data, but applying only these, which modify their user name. This is probably a very infrequent operation. The projections stores id->name map, which is used for various services to display the user friendly name. What can be done to improve the service performance storing this kind of mapping?

Consider storing two event sequence numbers:

  1. for remembering to which index the events were scanned
  2. for remembering the last user related event which actually changed the mapping

The second can be easily used to ETag all the responses. If the operation is infrequent, the responses can be 304ed easily for long periods of time. This ETag based optimization can be applied always. The more sparse projection state changes, the better chance of reusing the client cached response.

Cacheable REST API for time series

I’s been a while since last blog post and mapping back the blog domain. Let’s restart with something fency: REST API!

The problem
Lets look at any time series gathered in a eventually consistent medium (with a given time threshold). The entries/events gathered from various inputs are stored in a persistent data store. Queries for data are nothing than requests for given time slice. How one could design a REST API for accessing such a store?

The solution

Let’s start with the first request for a sample timeseries.scooletz.com link. To read it, in an ASP MVC Web API routing way would be something:

GET http://timeseries.scooletz.com/api/data

In the result I’d return a list of entries from now – t1 seconds till now – t2 seconds. t1 and t2 are selected arbitrary to match the eventual nature of the store and process of data gathering. Of course t1 > t2.

How to provide a navigation for it? Nothing simpler comes to my mind than putting LINK header which lets you provide navigation in a semantic way. The proposed value would be:

Link: <http://timeseries.scooletz.com/api/data/before/2014_09_29_23_59_10>; rel=”prev”

This would allow to get previous to the current head page. Why the second part is a multiplication of 10? That’s because of the time-chunk size. I’ve chosen to group all the entries in a 10 second chunk. This also means that the first request can contain from no data to full chunk span. There is an additional reason revealed later.

The second page, beside data placed in the body of the response would have the following value of the link header:

Link: <http://timeseries.scooletz.com/api/data/before/2014_09_29_23_59_00>; rel=”prev”,
<http://timeseries.scooletz.com/api/data/after/2014_09_29_23_59_10>; rel=”next”

The third page, and all the following ones would contain the following structure of the link header:

Link: <http://timeseries.scooletz.com/api/data/before/2014_09_29_23_58_50>; rel=”prev”,
<http://timeseries.scooletz.com/api/data/before/2014_09_29_23_59_10>; rel=”next”

As you can see, from the third page on, all the pages uses before links for navigation. There is a very good reason for it. The never expiring http cache headers set for the data in the past, which does not change.The second page could be cached, but the after link would always be responded with a non cacheable response. As the time passes, the response for the http://timeseries.scooletz.com/api/data/before/2014_09_29_23_59_10 would change their headers to

Link: <http://timeseries.scooletz.com/api/data/before/2014_09_29_23_59_00>; rel=”prev”,
<http://timeseries.scooletz.com/api/data/before/2014_09_29_23_59_20>; rel=”next”

These principles can be sum up to:

  1. cache-never the root request for data
  2. cache-forever before responses as they point to the consistent past
  3. cache-never after reponses as they point to the data being stillgathered

Which results in only non-cacheable first request, if the client moves to the past (using prev links). One can find it very helpful with a paradigm of infinite scroll. Only data for a few first entries would be fetched, all the latter would go from the browser/client cache.

Etags

You should remeber about ETags as well. Firefox, for instance, when a user hits F5, issues a request with max-age=0 trashing caches. If you add an ETag equal to the date included in the before link, you can verify it on the server side and immediately respond with 304 NotModified. The before links contains immutable data after all :)

Latency vs throughput

There are two terms which you should consider during designing your system. The more robust, the bigger system you design the deeper should be your understanding of these two values.

Throughput
Throughput is nothing more than number of operations per given unit of time which can be processed by your system. For instance, in a web site case one may want to easily handle one thousand requests per second. To define needed throughput you can use estimation like

given the number of users concurrently using system set to 1000,
given the estimated number of users actions per second set to 1,
the system should have throughput equal to 1000 req/s

Is it a good estimation? I’d reconsider for sure:

  1. peak values of concurrent users. In majority of systems there are hours where your servers do nothing. On the other hand, there are hours where all of your users are logged in
  2. number of actions per second. The value 1 operation/s may be good for a person seeing a computer for the very first time. It’s much lower than standard PC user response

The obvious operation one can do to increase the throughput is batching. It’s easier to write and fsync/FlushFileBuffers after writing a batch of entries rather than syncing all the time. The same goes with network IO. Sending a bigger frame containing more messages would lead to increased throughput.

Latency
Latency is a time till request completion. You should forget about silly average value and go for median, quartile and percentile, especially 99%, 99.9% and more. Don’t be fooled by calculating average latency across whole day. Especially for systems with lots of load, these many nines will be more common than you think. To get a taste of it you should watch definitely Gil Tene discussing some common pitfalls encountered in measuring and characterizing latency.

Throughput vs latency
Having this definition, is it good enough to ask for maximized throughput? My answer is that it isn’t.

Without defined and measured latency, throughput can be bounded by the most optimal batching requests for the slowest resources.

You should satisfy other requirements as well, or at least provide meaningful statistics like MBeans of Cassandra DB or EvenStore queues lengths.

Storm processor – bolts and joins

Storm processor, recently moved to Apache foundation is a powerful stream processing library open sourced by Twitter. It provides needed resources to scale out the processing across multiple machines, providing at-least-once guarantees or exactly-once using Trident. The library is based on two basic elements:

  1. spouts – sources of tuple streams
  2. bolts – processing units, consuming and emitting different streams of tuples

which are combined in a topology, a mesh of elements emitting and consuming events in order of data processing.
Streams, unlike EventStore are not cheap and represent a logical flow of data rather than an aggregate boundary. One stream can be emitted by more than one spout or bolt. The further discussion of streams is beyond scope of this article.
The bolt declarer, used in topology builder implements plenty of interfaces allowing to define consumed tuples of different streams. What it allows one to do is assigning a given bolt instance to handle a given set of tuples from a given stream of data, for example:

  1. fieldsGrouping lets you bind tuples from the given stream, which contain declared fields to the given instance of bolt. What it means is that tuple with a given field value will be routed to the same instance of bolt class! This provides a very powerful behavior letting you group tuples by any dimension
  2. localOrShuffleGrouping provides you with a great ability of routing data in the same worker process or, if the condition cannot be satisfied, move data to another worker selected ‘randomly’. This, when no grouping needed, lets you improve performance by execution collocation and skipping network overhead.

The bolt isn’t limited to consume tuples only from one grouping. It can join multiple streams grouped in multiple ways. This can bring another opportunity for data repartitioning. For example, application emitting streams of data like exceptions on the production environment and user transactions can easily raise an alarm when a given user experience more than one exception every 10 transactions. A simple bolt using two fields groupings can deal with it easily.
I hope this short introduction will encourage you to dive into Storm. It’s a very powerful tool, especially in Complex Event Processing, with scale out possibilities.