Hot or not? Data inside of Service Fabric

TL;DR

When calculating the space needed for your Service Fabric cluster, especially in Azure, one can hit machine limits. After all, a D2 instance has only 100 GiB of local disk and this is the disk used by Service Fabric to store data onto. 100 GiB might be not that small, but if you use your cluster for more than one application, you can hit the wall.

Why local?

There’s a reason behind using local, ephemeral disk for Service Fabric storage. The reason is locality. As Service Fabric replicates the data, you don’t need to store them in a highly available storage as the cluster provides one on its own. Storing data in multiple copies by using Azure Storage Services is not needed. Additionally, using local, SSD drives is much faster. It’s a truly local disk after all.

Saturation

Service Fabric is designed to run many applications with many partitions. After all, you want to keep your cluster saturated (almost) as using a few VMs just to run an app that is needed once in a month would be useless. Again, if you run many applications, you need to think about capacity. Yes, you might be running stateless services which don’t require one, but not using stateful services would be a waste. They provide an efficient, transactional, replicated database built in inside of them. So what about the data? What if you saturate the cluster not in terms of CPU but the storage.

Hot or not

One of the approaches you could use is the separation between hot and cold data. For instance, users haven’t logged in for one month could have their data considered as cold. These data could be offloaded from the cluster to Azure Storage Services, leaving more space for one that are needed. When writing applications that use an append only model (for instance ones based on event sourcing) you could think about offloading events older than X days, at the same time ensuring that they can be accessed. Yes, the access will be slower, but it’s unlikely that you’ll need them on regular basis.

Summary

When designing your Service Fabric apps and planning your cluster capacity think through the hot/cold approach as well. This, could lower your requirements for the storage space and enable you to use the same cluster for more application, which effectively is what Service Fabric is for.

How does Service Fabric host your services?

TL;DR

Service Fabric provides an amazing fully automated hosting for any number of services with any number of instances each (up to the physical limits of your cluster). But how are these hosted? What if you have more partitions than nodes?

Structure recap

When building an app that is meant to be hosted in Service Fabric you build an… app. This application might consist of multiple stateful and stateless services. The application is packaged to a… package that, when uploaded to the cluster as an image, provides an application type. From this, you can instantiate multiple application instances. Let me give you an example.

Application “Bank” consists of two services:

  1. “Users”
  2. “Accounts”

When build with version “1.0.0” and packaged, it can be uploaded to the SF cluster and is registered as “Bank 1.0.0”. From now on you can instantiate as many banks as you want within your cluster. Each will be composed of two sets of services: “Users” and “Accounts”.

Services, stateful services

When defining stateful services, these that have a built in kind-of database (reliable collections, or SewingSession provided by my project SewingMachine) you need to define how many partitions they will have. You can think of partitions as separate databases. Additionally, you define the number of replicas every partition will have. That’s done to ensure high availability. Let me give you an example.

  1. “Users” have the number of partitions set to 100 and every partition is replicated to 5 replicas (let’s say P=100, R=5)
  2. “Accounts” are configured with P=1000, R=7

Imagine that it’s hosted on a cluster that has only 100 nodes. This will mean, that on every node (on average) system will place 5 replicas of “Users” and 70 replicas of “Accounts”. It’s a valid scenario. Once some nodes are added to the cluster, replicas will be automatically moved to new nodes lowering the saturation of previously existing.

What if a node hosts more than one replica of one service, how are they hosted? Moreover, how do the communicate, as there’s only one port assigned to do it?

Cohosting to the rescue

Currently, all the replicas are hosted within the same process. Yes, although 5 “Users” instances will be created, they will all be sitting in the same AppDomain of the same process. The same goes for 70 “Accounts”. You can check it on your own by obtaining the current process ID (PID) and AppDomain.Current and compare. This reduces the overhead of hosting as all assemblies and static resources (assemblies loaded, static fields, types) are shared across replicas.

One port to rule them all

By default, when using native Service Fabric communication listener, only one port is used by an endpoint. How is possible that the infrastructure knows how to route messages to the right partition and replica? Under the hood, when opening a communication listener, replica registers the identifier of the partition it belongs to and its replica number. That’s how, when a message arrives, Service Fabric infrastructure is capable of sending the message to the right communication listener, and therefore, to the right service instance.

Summary

Now you know, that all replicas of partitions of the same service on one node are cohosted in the same process and that Service Fabric infrastructure dispatches messages accordingly to the registered partition/replica pair.

Orchestrating processes with full recoverability

TL;DR

Do you call a few services in a row as a part of a bigger process? What if one of the calls fails? What if your hosting application fails? Do you provide a reliable way for successfully finishing your process? If not, I might have a solution for you.

Anatomy of a process

A process can be defined as at least two calls to different services. When using a client library of some sort and C# async-await feature one could write a following process


var id = await invoiceService.IssueInvoice(invoiceData);
await notificationService.NotifyAboutInvoice(id);

It’s easy and straightforward. First, we want to issue an invoice. Once it’s done, a notification should be sent. Both calls although they are async should be executed step by step. Now. What if the process is halted after issuing the invoice? When we rerun it, there’s no notion of something stopped in the middle. One could hope for good logging, but what if this fails as well.

Store and forward

Here comes the solution provided by DurableTask library provided by the Azure team. The library provides a capability of recording all the responses and replaying them without execution. All you need is to create proxies to the services using a special orchestration context.

With a process like the above when executing following state is captured:

  1. Initial params to the instance of the process
  2. invoiceData are stored when first call is done
  3. invoiceService returns and the response is recorded as well
  4. invoiceNumber is stored as a parameter to the second call
  5. notificationService returns and it’s marked in the state as well

As you can see, every execution is stored and is followed by storing it’s result. OK. But what does it mean if my process fails?

When failure occurs

What happens when failure occurs. Let’s consider some of the possibilities.

If an error occurs between 1 and 2, process can be restarted with the same parameters. Nothing really happened.

If an error occurs between 2 and 3, process is restarted. The parameters to the call were stored but there’s no notion of the call to the first service. It’s called again (yes, the delivery guarantee is at-least-once).

If an error occurs between 3 and 4, process is restarted. The response to the call to the invoice service is restored from the state (there’s no real call made). The parameters are established on the basis of previous values.

And so on and so forth.

Deterministic process

Because the whole process is based either on the input data or already received calls’ results it’s fully deterministic. It can be safely replayed when needed. What are not deterministic calls that you might need? DateTime.Now comes immediately to one’s mind. You can address it by using deterministic time provided by the context.CurrentUtcDateTime.

What’s next

You can build a truly powerful and reliable processes on top of it. Currently, implementation that is provides is based on Azure Storage and Azure Service Bus. In a branch you can find an implementation for Service Fabric, which enables you to use it in your cluster run on your development machine, on premises or in the cloud.

Summary

Ensuring that a process can be run till a successful end isn’t an easy task. It’s good to see a library that uses a well known and stable language construct of async-await and lifts it to the next level, making it an important tool for writing resilient orchestrations.

 

Much Better Stateful Service Fabric

TL;DR

In the last post we found the COM+ interface that is a foundation for KeyValueStoreReplica persistence. It’s time to describe the way, how the underlying performance can be unleashed for the managed .NET world.

Internal or not, I don’t care

The sad part of this COM+ layer of ServiceFabric is that’s internal. The good part of .NET is that when running in Full Trust, you can easily overcome this obstacle with Reflection.Emit namespace and emitting helper methods wrapping internal types. After all, there is a public surface that you can start with. The more you know about MSIL and internals of CLR and the more you love it, the less tears and pain will be caused by the following snippet code. Sorry, it’s time for some IL madness.


var nonNullResult = il.DefineLabel();

// if (result == null)
//{
//    return null;
//}
il.Emit(OpCodes.Ldloc_0);
il.Emit(OpCodes.Brtrue_S, nonNullResult);
il.Emit(OpCodes.Ldnull);
il.Emit(OpCodes.Ret);

il.MarkLabel(nonNullResult);

// GC.KeepAlive(result);
il.Emit(OpCodes.Ldloc_0);
il.EmitCall(OpCodes.Call, typeof(GC).GetMethod("KeepAlive"), null);

// nativeItemResult.get_Item()
il.Emit(OpCodes.Ldloc_0);
il.EmitCall(OpCodes.Callvirt, InternalFabric.KeyValueStoreItemResultType.GetMethod("get_Item"), null);

il.EmitCall(OpCodes.Call, ReflectionHelpers.CastIntPtrToVoidPtr, null);
il.Emit(OpCodes.Stloc_1);

// empty stack, processing metadata
il.Emit(OpCodes.Ldloc_1);   // NativeTypes.FABRIC_KEY_VALUE_STORE_ITEM*
il.Emit(OpCodes.Ldfld, InternalFabric.KeyValueStoreItemType.GetField("Metadata")); // IntPtr
il.EmitCall(OpCodes.Call, ReflectionHelpers.CastIntPtrToVoidPtr, null); // void*
il.Emit(OpCodes.Stloc_2);

il.Emit(OpCodes.Ldloc_2); // NativeTypes.FABRIC_KEY_VALUE_STORE_ITEM_METADATA*
il.Emit(OpCodes.Ldfld, InternalFabric.KeyValueStoreItemMetadataType.GetField("Key")); // IntPtr

il.Emit(OpCodes.Ldloc_2); // IntPtr, NativeTypes.FABRIC_KEY_VALUE_STORE_ITEM_METADATA*
il.Emit(OpCodes.Ldfld, InternalFabric.KeyValueStoreItemMetadataType.GetField("ValueSizeInBytes")); // IntPtr, int

il.Emit(OpCodes.Ldloc_2); // IntPtr, int, NativeTypes.FABRIC_KEY_VALUE_STORE_ITEM_METADATA*
il.Emit(OpCodes.Ldfld, InternalFabric.KeyValueStoreItemMetadataType.GetField("SequenceNumber")); // IntPtr, int, long

il.Emit(OpCodes.Ldloc_1); // IntPtr, int, long, NativeTypes.FABRIC_KEY_VALUE_STORE_ITEM*
il.Emit(OpCodes.Ldfld, InternalFabric.KeyValueStoreItemType.GetField("Value")); // IntPtr (char*), int, long, IntPtr (byte*)

The part above is just a small snippet responsible partially for reading the value. If you’re interested in more, here are over 200 lines of emit that brings the COM+ to the public surface. You don’t need to read it though. SewingMachine delivers a much nicer interface for it.

RawAccessorToKeyValueStoreReplica

RawAccessorToKeyValueStoreReplica is a new high level API provided by SewingMachine. It’s not as high level as the original KeyValueStoreReplica as it accepts IntPtr parameters, but still, it removes a lot of layers leaving the performance, serialization, memory management decisions to the end user of the library. You can use your own serializer, you can use stackalloc to allocate on the stack (if values are small) and much much more. This accessor is a foundation for another feature provided by SewingMachine, called KeyValueStatefulService, a new base class for your stateful services.

Summary

We saw how KeyValueStoreReplica is implemented. We took a look at the COM+ interface call sites. Finally, we observed how, by emitting IL, one can expose an internal interface, wrap it in a better abstraction and expose it to the caller. It’s time to take a look at the new stateful service.

 

Better Stateful Service Fabric

TL;DR

This is a follow up entry to Stateful Service Fabric that introduced all the kinds of statefulness you can out of the box from the Fabric SDK. It’s time to move on with providing a better stateful service. First, we need to define what better means.

KeyValueStoreReplica implementation details

The underlying actors’ persistence is based on KeyValueStoreReplica. This class provides a high level API for low level interop interfaces provided by Service Fabric. Yep, you heard it. The runtime of the Fabric is unmanaged and is exposed via COM+ interfaces. Then they are wrapped in a nice .NET classes just to be delivered to you. The question is how nice are these classes? Let’s take a look at the decompiled part of a method.


private void UpdateHelper(TransactionBase transactionBase, string key, byte[] value, long checkSequenceNumber)
{
using (PinCollection pin = new PinCollection())
{
IntPtr key1 = pin.AddBlittable((object) key);
Tuple<uint, IntPtr> nativeBytes = NativeTypes.ToNativeBytes(pin, value);
this.nativeStore.Update(transactionBase.NativeTransactionBase, key1, (int) nativeBytes.Item1, nativeBytes.Item2, checkSequenceNumber);
}
}

 

pinning + Interop + pointers = fun

As you can see above, when you pass a string key and a byte[] value a lot must be done to execute the update:

  1. value needs to be allocated every time

    as there’s no interface accepting Stream or ArraySegment<byte> that would allow you to reuse bytes used for allocations, you always need to ToArray the payload

  2. key and value are pinned for the time of executing update.

    Pinning, is nothing more or less than prohibiting object from being moved by GC. The same effect that you get when using fixed keyword or using GCHandle.Alloc. When handles not that many requests it’s ok to do it. When GC kicks in frequently, this might be a problem.

The interesting part is the nativeStore field that provides the COM+ seam to the Fabric internal interface. This is the interface that is closest to the Fabric surface and that allows to squeeze performance out of it.

Summary

You can probably see, when this leads us. Seeing that underneath the .NET wrapper there is a COM+ that has much more low level interface and allows to use raw memory, we can try to access it directly, skipping KeyValueStoreReplica altogether and write a custom implementation that will enable to maximize the performance.

 

Stateful Service Fabric

TL;DR

This is a very next entry in a series covering my journey based on Service Fabric and my Open Source project called SewingMachine. After hitting the internal wall of Service Fabric I pivoted the approach of as I knew that I can’t change the Actors’ part as I wanted (too much internals) and need to work on a different level.

Underneath it’s all stateful

It doesn’t matter if you use StatefulService or the actor’s model of Service Fabric. In the first case you explicitly work with a state using reliable collections: a distributed, transactional dictionary and a queue. You can access them using StateManager of your service derived from the mentioned base class.

In case of actors, you just register an actor in the Actors’ Runtime. Underneath, this results in creating a stateful ActorService. Both, StatefulService and ActorService derive from StatefulServiceBase. The way they provide and operate on the state is quite different.

Reliable collections

Reliable collections provide a dictionary and a queue. Both, are transactional so you can wrap any operations with a regular transaction. They provide different semantics, where the dictionary is a regular dictionary, a lookup if you prefer or a hashmap. The queue is a regular FIFO structure. The transactions have a bit different semantics from regular db transactions, but for now, you may treat them as regular ones.

If the stateful services use reliable collections, is it the same for actors? It looks like it’s not.

KeyValueStoreReplica

The storage for actors is provided by KeyValueStoreReplica already covered in here and here. If you asked is it that much different from the reliable collections, I’d say yes. It has different semantics and provides a simple replicable key-value store. If its is so different, could we build something else than an actor system on top of it?

The answer will be revealed in the very next post.

Hitting internal wall in Service Fabric

TL;DR

In this post I share my experience in trying extending Service Fabric for Sewing Machine purposes.

Sewing Machine aim

The aim of Sewing Machine is to extend the Service Fabric actor model to use better, faster, less-allocating foundations. The only part I was working on so far was persisted actors. This is the one stored in the KeyValueReplica, I’ve been writing about for last few weeks.

Not-so public seam

I started my work by discovering how the persisted actors are implemented. The class responsible for it is named KvsActorStateProvider. It uses following components:

  • KeyValueStoreWrapper – private
  • VolatileLogicalTimeManager.ISnapshotHandler – internal
  • VolatileLogicalTimeManager – internal
  • IActorStateProviderInternal – internal
  • ActorStateProviderHelper – internal, responsible for shared logic among providers
  • IActorStateProvider – public interface to implement

As you can see, the only part that is given is a seam of the state provider. Every single helper that one could use to implement their own, is internal. Additionally, the provider interface is filled with other interfaces that one needs to implement. I know, sharing data structures isn’t the best option, but as the ServiceFabric share them internally, why wouldn’t you give it to the user.

All of the above means that extending actors’ runtime is hard if not impossible. It provides no real extension points and has its public seam not prepared for it. What does it mean for SewingMachine?

Can’t win, change battle

Rewriting the runtime would be time-consuming. I can’t spend half a year on writing it and don’t want to. At least, not now. I’ve implemented a faster unsafe wrap around KeyValueReplicaStore that still can be useful. To make an impact with SewingMachine, I’ll introduce the event driven actor first, even when clean up will be run by not efficient regular Actor disposal. Using a custom serializer and adhering to the currently used prefixes, later on can be changed to use a custom runtime. The only problem would be reminders, but this can be handled as well by a better versioning of them.

Summary

SewingMachine was meant to extend the actors’ runtime. Seeing difficulties like the ones described above, I could either kill it or repurpose it to provide a real value first, leaving performance for later. That’s how we’ll do it.