Aggregate, an idempotent receiver

In the previous post I covered the process manager subscribing to and consuming events from multiple sources. Additionally, it was show that saving the position of read logs after performing action is sufficient to get at-least-once delivery (retry in case of errors).

Let me consider an aggregate which an action is invoked on. As the only transactional boundary that can be used is the aggregate itself, to each call from process manager we’ll add additional data:

  1. hash (unique, SHA1 probably) of the process manager identifier and the name of the origin module where the handled event was taken from
  2. the order number of the handled event

This two values combined in an event, will allow in one transaction to check, whether the action has been already applied and skip it if needed. Everything in one transaction.
As order numbers for the given hash can only increase, the state of this idempotent received can be modeled as a dictionary with Sha1 value as its key and the order number as its value.
The only disadvantage is additional event added to the aggregate for each action performed within a process manager. Fortunately, a scavenging process, a similar one to this from EventStore. When events are dumped to a file from a store of your choice, only the last value for the given Sha1 hash can be stored.

Process manager in event sourcing

There is a pattern which can be used to orchestrate collaboration of different aggregates, even if they are located in different contexts/domains. This patters is called a process manager. What it does is handling events which may result in actions on different aggregates. How can one create a process manager handling events from different sources? How to design storage for a process manager?

In the latest take of event sourcing I used a very same direction taken by EventStore. My first condition was to have a single natural number describing the sequence number for each event committed in the given context/domain (represented as module). This, because of using an auto-incrementing identity in a relational database table, even when some event may be rolled back by transaction, has resulted in an monotonically increasing position number for any event appended in the given context/domain. This lets you to use the number of a last read event as a cursor for reading the events forward. Now you can imagine, that having multiple services results in having multiple logs with the property of monotonically increasing positions, for example:

  • Orders: e1, e2, e3, e6
  • Notifications: e1, e2, e3, e4

If a process manager reads from multiple contexts/domains, you can easily come to a conclusion that all you need to store is a last value of a cursor from the given domain. Once an event is dispatched, in terms of finishing handling by the process manager, the cursor value for the context event was created within is updated. This creates an easy but powerful tool for creating process managers with at-least-once process guarantee for all the events they have to process.

If a process provides guarantee of processing events at-least-once and can perform actions on aggregates, it may, as action on aggregate and saving the state of a PM isn’t transactional, perform the given action more than once. It’s easy: just imagine crushing the machine after the action but before marking the event as dispatched. How can we ensure that no event will result in a doubled action? This will be the topic of the next post.

Business Driven Development

If you’re into software development you’ve probably heard about Behavior-driven development. Recently I had a discussion whether or not business people think in this way. Fortunately, I was involved in a business workshop, so I could make some observations.
The way mentioned earlier is the only language business uses to define and discuss aspects of their actions. They are some abbreviations like:
Once we reach 1000 participants, we assign them rooms
Which can be easily translated into

  • Given 999 participants registered
  • When a participant registered
  • Then the rooms are allocated

This can be easily read by business as by developers.
If you can model your solutions towards this kind of testing, which not necessarily must be performed with tools for BDD but can be easily done by introducing Event Sourcing and structuring your tests like in Lokad CQRS examples then you can finally start to discuss business ideas with business instead of describing how your db is updated. And this, for sure makes the difference.

CRUD chat

A: Hello, have you CREATEd a new car?
B: No! I just UPDATEd its Owner field, setting it to my id. I needed to UPDATE the balance field of my Account row as well.
A: Oh, I see. Yesterday Tom DELETEd a few employees. They were stealing money. Unfortunately there is a transition period, so first he needed to UPDATE their IsActive to false, then after the period he could finally DELETE them.
B. Yes, that’s the way you do it.

No it’s not. People do not use only four verbs to describe their activities, and if they do, they have a real problem. The scope of vocabulary used by business as well as other people is much wider and their is a reason behind it. You can name everything a THING, you can use only four CRUD verbs to describe activities but instead of meaningful phrases you get a long sentences filled with clarifications. Using a vocabulary consisting of a few words only will not only increase the number of words to describe something but for sure will for sure loose some of the meaning. Can you afford? Can your company afford it as well?

Embracing domain leads solution towards event oriented design

One of the most powerful aspects of git is it’s simplicity. One can easily read the object chapter of the Git book during one afternoon and learn, that git stores nothing more than snapshots of the current state of the items added to the repository. If an item is repeated in multiple commits with no changes, it’s referred under the same SHA1 value and it doesn’t have to be stored twice. This decision is well explained by Linus in here. The major point of this explanation is that having an algorithm finding changes, narrow enough and well described like method declaration moved, etc. can be hard and costly. To store a snapshot of the state is much easier and let’s you run your algorithms much later. This allows algorithms evolutions working on the unmodified version of the state at any time.
What git does is storing the every state you commit. The commit object contains a reference to a tree object, which consists of other objects. This results in storing state of any commit in the entire repository history. This means that git never overrides the state. All it does is adding more and more with pointers to the parent states/commits. This allows you to run any tool/algorithm through the entire history of any branch.
This considerations rooted in the git repository design can imply following paradigms to modelling.

State driven modelling
It’s simple to store state. All you’ve got to do is to serialize all the data and put into the store, but… how many times you’ve written a system which performs updates? What about the earlier state? Is it preserved? Or maybe it’s overridden? I can hope that previous values are stored in a some kind of an audit log, but it’s an audit log, not your previous state, isn’t it? It’s not the same. Nathan Marz is discussing the fragility of updates in his talk here. Maybe storing a new state with a link to the previous one (no audit log, just the old value of the state) isn’t that bad after all.

Changed events
The second take on modelling would be embracing the changes with ___Changed events. You know a property/getter changed it’s value and it’s good to audit it. Unfortunately it can be met in solutions which requires audit logs. Storing a common ‘name – old value – new value‘ tuple is easy. It may be not that simple to deal with any domain changes, or get your new algorithm run through the state of a given entity through entire history but it’s easy. I’d consider it a poor man solution for a person which doesn’t want to invest his/her time in learning the domain. One can audit anything with this kind of paradigm. It’s all text after all, isn’t it?

Event sourcing
The last take is event sourcing capturing the business events, which applied onto the previous state lead to the next one. This is also mentioned by Linus, when he talks about clever algorithms calculating perfect deltas. To get one, to get the event and the transition/change it emits when applied, it takes a lot of investment into understanding the domain. I can imagine that perfect events for git repository of C# project would contain events like:

  • method renamed
  • method moved
  • functionality added

Of course it is/may be impossible to provide this kind of information retrieval but it shows the way, that well understood and enriched with right events’ types domain can be minimalistically described with a set of events.

Out of order commands

In the previous posts a simple mechanism of storing information needed for operation idempotence was introduced. A simple hash table, which state is transactionally saved with the state of object onto which the send operation was applied. How about receiving operations out of order? What if infrastructure (for instance, messaging system) will pass one operation earlier than the second, which in reality occurred earlier?

It’s time to make it explicit and start calling elements in the DDD manner. So for sake of reference, the object considered as the subject of an operation is an aggregate root. The operation is of course a message. The modeling assumes using the event sourcing as a storage for aggregates’ states.

Assume, that the aggregate, which the command is sent to, has a property called Version, incremented with each event applied on. Assume then, each command contains a version number, which is supposed to be equal to the aggregate’s version. If, during dispatch, these two values are different, an exception is thrown and command do not change the state of the aggregate. It’s a simple optimistic concurrency implementation, allowing discarding out of order commands sent to an object.

To make it more interesting, consider a sharded system, where specific aggregates are stored by different nodes (but for each aggregate there is one node where it is stored). An aggregate’s events (state changes) have to be propagated across all the nodes/shards in the same idempotent manner as commands are sent to aggregates. It’s easy to apply hashtable for each node and with using the very same key: aggregateId with version but it would mean storing all the pairs of aggregate identifiers with their versions, which could possibly bring down each of your nodes (or make you use GBs of memory). Can the trivial fact, that version is increased with every event on the aggregate, could be used for some optimization? You’ll see in the next entry.

Idempotence, pt. 2

In the previous post a few operations were taken into consideration, whether there are (not) idempotent. For the sake of reference, here there are:

  • Marked as default
  • Money transfer ‘500$’ ordered to ‘x’ account
  • Label ‘leave sth for the future month’ added

If we consider ‘idempotent’ as an operation which can be applied multiple times in a row, then all the operations overriding previous values of some properties are idempotent. Having some entity marked as default 5 times does not change the fact that it is default. That’s for sure. What about provisioning ‘x’ account with 500$? Can this type of operation can be reapplied multiple times? Of course not, because it does not override any property, it changes the state, by interacting with a previous one. The same goes for ‘labeling’, of course if there is no compensation introduced (select only unique labels before saving, which would allow reapplying).

What if you want your system to be resistant to operations resend multiple times? The simplest solution is to add unique identifier for each operation and storing them is a lookup (hashtable). Each time the operation arrives, the lookup is checked whether there this operation was already processed. If so, skip it.

There is one additional condition is to have the lookup transactional with a storage you save the states. This condition is a simple ‘all-or-none’ for storing the result of operation with the fact, that this specific operation was already applied. Otherwise, if lookup would be updated in the first place and storing the state after the operation failed, there would be no change saved. The same applies to a situation, where the lookup is updated at the very end. The operation result is saved, adding info about operation to lookup fails and the next time the same operation arrives it is applied one more time. Having that said, lookup must be transactional with the medium where state is saved.