Hot or not? Data inside of Service Fabric

TL;DR

When calculating the space needed for your Service Fabric cluster, especially in Azure, one can hit machine limits. After all, a D2 instance has only 100 GiB of local disk and this is the disk used by Service Fabric to store data onto. 100 GiB might be not that small, but if you use your cluster for more than one application, you can hit the wall.

Why local?

There’s a reason behind using local, ephemeral disk for Service Fabric storage. The reason is locality. As Service Fabric replicates the data, you don’t need to store them in a highly available storage as the cluster provides one on its own. Storing data in multiple copies by using Azure Storage Services is not needed. Additionally, using local, SSD drives is much faster. It’s a truly local disk after all.

Saturation

Service Fabric is designed to run many applications with many partitions. After all, you want to keep your cluster saturated (almost) as using a few VMs just to run an app that is needed once in a month would be useless. Again, if you run many applications, you need to think about capacity. Yes, you might be running stateless services which don’t require one, but not using stateful services would be a waste. They provide an efficient, transactional, replicated database built in inside of them. So what about the data? What if you saturate the cluster not in terms of CPU but the storage.

Hot or not

One of the approaches you could use is the separation between hot and cold data. For instance, users haven’t logged in for one month could have their data considered as cold. These data could be offloaded from the cluster to Azure Storage Services, leaving more space for one that are needed. When writing applications that use an append only model (for instance ones based on event sourcing) you could think about offloading events older than X days, at the same time ensuring that they can be accessed. Yes, the access will be slower, but it’s unlikely that you’ll need them on regular basis.

Summary

When designing your Service Fabric apps and planning your cluster capacity think through the hot/cold approach as well. This, could lower your requirements for the storage space and enable you to use the same cluster for more application, which effectively is what Service Fabric is for.

ProtobufRaw vs protobuf-net

TL;DR

I’m working currently on SewingMachine, an OSS project of mine, that is aimed at unleashing the ultimate performance for your stateful services written in/for Service Fabric (more posts: here). In this post I’m testing further (previous test is here) whether it would be beneficial to write a custom unmanaged writer for protobuf-net using stackallocked memory.

SewingMachine is raw, very raw

SewingMachine works with pointers. When storing data, you pass an IntPtr with a length as a value. Effectively, it means that if you use a managed structure to serialize your data, finally you’ll need to either pin it (pinning is notification for GC to do not move object around when it’s pinned) or have it pinned from the very beginning (this approach could be beneficial if an object is large and has a long lifetime). If you don’t want to use managed memory, you could always use stackalloc to allocate a small amount of memory on stack, serialize to it, and then pass it as IntPtr. This is approach I’m testing now.

Small, fixed sized payloads

If a payload, whether it’s an event or a message is small and contains no fields of variable length (strings, arrays) you could estimate the maximum size it will take to get serialized. Next, instead of using Protobuf-net regular serializer, you could write (or emit during a post-compilation) a custom function to serialize a given type, like I did in this spike. Then it’s time to test it.

Performance tests and over 10x improvement

Again, as in the previous post about memory, the unsafe stackallock version shows that it could be beneficial to invest some more time as the performance benefit is just amazing . The raw version is 10x faster. Wow!

protoraw_vs_proto

Summary

Sometimes using raw unsafe code improves performance. It’s worth to try it, especially in situations where the interface you communicate with is already unsafe and requiring to use unsafe structures.

ThreadStatic vs stackalloc

TL;DR

I’m working currently on SewingMachine, an OSS project of mine, that is aimed at unleashing the ultimate performance for your stateful services written in/for Service Fabric (more posts: here). In this post I’m testing whether it would be beneficial to write a custom unmanaged writer for protobuf-net, instead of using some kind of object pooling with ThreadLocal.

ThreadStatic and stackalloc

ThreadStatic is the old black. It was good to use before async-await has been introduced. Now, when you don’t know on which thread your continuation will be run, it’s not that useful. Still, if you’re on a good old-fashioned synchronous path, it might be used for object pooling and keeping one object per thread. That’s how protobuf-net caches ProtoReader objects.

One could use it to cache locally a chunk of memory for serialization. This could be a managed or unmanaged chunk, but eventually, it would be used to pass data to some storage (in my case, SewingSession from SewingMachine). If the interface accepted unmanaged chunks, I could also use stackalloc for small objects, that I know how much memory will be occupied by. stackalloc provides a way to allocate some number of bytes from the stackframe. Yes, it’s unsafe so keep your belts fastened.

ThreadStatic vs stackalloc

I gave it a try and wrote a simple (if it’s dummy, I encourage you to share your thoughts in comments) test that either uses a ThreadStatic-pooled object with an array or a stackallocated and writes. You can find it in this gist.

How to test it? As always, to the rescue comes BenchmarkDotNet, the best benchmarking tool for any .NET dev. Let’s take a look at the summary now.

local_vs_threadstatic.png

Stackalloc wins.

There are several things that should be taken into consideration. Finally block, the real overhead of writing an object and so on and so forth. Still, it looks that for heavily optimized code and small objects, one could this to write them a bit faster.

Summary

Using stackallocated buffers is fun and can bring some performance benefits. If I find anything unusual or worth noticing with this approach, I’ll share my findings. As always, when working on performance, measure first, measure in the middle and at the end.

How does Service Fabric host your services?

TL;DR

Service Fabric provides an amazing fully automated hosting for any number of services with any number of instances each (up to the physical limits of your cluster). But how are these hosted? What if you have more partitions than nodes?

Structure recap

When building an app that is meant to be hosted in Service Fabric you build an… app. This application might consist of multiple stateful and stateless services. The application is packaged to a… package that, when uploaded to the cluster as an image, provides an application type. From this, you can instantiate multiple application instances. Let me give you an example.

Application “Bank” consists of two services:

  1. “Users”
  2. “Accounts”

When build with version “1.0.0” and packaged, it can be uploaded to the SF cluster and is registered as “Bank 1.0.0”. From now on you can instantiate as many banks as you want within your cluster. Each will be composed of two sets of services: “Users” and “Accounts”.

Services, stateful services

When defining stateful services, these that have a built in kind-of database (reliable collections, or SewingSession provided by my project SewingMachine) you need to define how many partitions they will have. You can think of partitions as separate databases. Additionally, you define the number of replicas every partition will have. That’s done to ensure high availability. Let me give you an example.

  1. “Users” have the number of partitions set to 100 and every partition is replicated to 5 replicas (let’s say P=100, R=5)
  2. “Accounts” are configured with P=1000, R=7

Imagine that it’s hosted on a cluster that has only 100 nodes. This will mean, that on every node (on average) system will place 5 replicas of “Users” and 70 replicas of “Accounts”. It’s a valid scenario. Once some nodes are added to the cluster, replicas will be automatically moved to new nodes lowering the saturation of previously existing.

What if a node hosts more than one replica of one service, how are they hosted? Moreover, how do the communicate, as there’s only one port assigned to do it?

Cohosting to the rescue

Currently, all the replicas are hosted within the same process. Yes, although 5 “Users” instances will be created, they will all be sitting in the same AppDomain of the same process. The same goes for 70 “Accounts”. You can check it on your own by obtaining the current process ID (PID) and AppDomain.Current and compare. This reduces the overhead of hosting as all assemblies and static resources (assemblies loaded, static fields, types) are shared across replicas.

One port to rule them all

By default, when using native Service Fabric communication listener, only one port is used by an endpoint. How is possible that the infrastructure knows how to route messages to the right partition and replica? Under the hood, when opening a communication listener, replica registers the identifier of the partition it belongs to and its replica number. That’s how, when a message arrives, Service Fabric infrastructure is capable of sending the message to the right communication listener, and therefore, to the right service instance.

Summary

Now you know, that all replicas of partitions of the same service on one node are cohosted in the same process and that Service Fabric infrastructure dispatches messages accordingly to the registered partition/replica pair.

Orchestrating processes with full recoverability

TL;DR

Do you call a few services in a row as a part of a bigger process? What if one of the calls fails? What if your hosting application fails? Do you provide a reliable way for successfully finishing your process? If not, I might have a solution for you.

Anatomy of a process

A process can be defined as at least two calls to different services. When using a client library of some sort and C# async-await feature one could write a following process


var id = await invoiceService.IssueInvoice(invoiceData);
await notificationService.NotifyAboutInvoice(id);

It’s easy and straightforward. First, we want to issue an invoice. Once it’s done, a notification should be sent. Both calls although they are async should be executed step by step. Now. What if the process is halted after issuing the invoice? When we rerun it, there’s no notion of something stopped in the middle. One could hope for good logging, but what if this fails as well.

Store and forward

Here comes the solution provided by DurableTask library provided by the Azure team. The library provides a capability of recording all the responses and replaying them without execution. All you need is to create proxies to the services using a special orchestration context.

With a process like the above when executing following state is captured:

  1. Initial params to the instance of the process
  2. invoiceData are stored when first call is done
  3. invoiceService returns and the response is recorded as well
  4. invoiceNumber is stored as a parameter to the second call
  5. notificationService returns and it’s marked in the state as well

As you can see, every execution is stored and is followed by storing it’s result. OK. But what does it mean if my process fails?

When failure occurs

What happens when failure occurs. Let’s consider some of the possibilities.

If an error occurs between 1 and 2, process can be restarted with the same parameters. Nothing really happened.

If an error occurs between 2 and 3, process is restarted. The parameters to the call were stored but there’s no notion of the call to the first service. It’s called again (yes, the delivery guarantee is at-least-once).

If an error occurs between 3 and 4, process is restarted. The response to the call to the invoice service is restored from the state (there’s no real call made). The parameters are established on the basis of previous values.

And so on and so forth.

Deterministic process

Because the whole process is based either on the input data or already received calls’ results it’s fully deterministic. It can be safely replayed when needed. What are not deterministic calls that you might need? DateTime.Now comes immediately to one’s mind. You can address it by using deterministic time provided by the context.CurrentUtcDateTime.

What’s next

You can build a truly powerful and reliable processes on top of it. Currently, implementation that is provides is based on Azure Storage and Azure Service Bus. In a branch you can find an implementation for Service Fabric, which enables you to use it in your cluster run on your development machine, on premises or in the cloud.

Summary

Ensuring that a process can be run till a successful end isn’t an easy task. It’s good to see a library that uses a well known and stable language construct of async-await and lifts it to the next level, making it an important tool for writing resilient orchestrations.

 

Much Better Stateful Service Fabric

TL;DR

In the last post we found the COM+ interface that is a foundation for KeyValueStoreReplica persistence. It’s time to describe the way, how the underlying performance can be unleashed for the managed .NET world.

Internal or not, I don’t care

The sad part of this COM+ layer of ServiceFabric is that’s internal. The good part of .NET is that when running in Full Trust, you can easily overcome this obstacle with Reflection.Emit namespace and emitting helper methods wrapping internal types. After all, there is a public surface that you can start with. The more you know about MSIL and internals of CLR and the more you love it, the less tears and pain will be caused by the following snippet code. Sorry, it’s time for some IL madness.


var nonNullResult = il.DefineLabel();

// if (result == null)
//{
//    return null;
//}
il.Emit(OpCodes.Ldloc_0);
il.Emit(OpCodes.Brtrue_S, nonNullResult);
il.Emit(OpCodes.Ldnull);
il.Emit(OpCodes.Ret);

il.MarkLabel(nonNullResult);

// GC.KeepAlive(result);
il.Emit(OpCodes.Ldloc_0);
il.EmitCall(OpCodes.Call, typeof(GC).GetMethod("KeepAlive"), null);

// nativeItemResult.get_Item()
il.Emit(OpCodes.Ldloc_0);
il.EmitCall(OpCodes.Callvirt, InternalFabric.KeyValueStoreItemResultType.GetMethod("get_Item"), null);

il.EmitCall(OpCodes.Call, ReflectionHelpers.CastIntPtrToVoidPtr, null);
il.Emit(OpCodes.Stloc_1);

// empty stack, processing metadata
il.Emit(OpCodes.Ldloc_1);   // NativeTypes.FABRIC_KEY_VALUE_STORE_ITEM*
il.Emit(OpCodes.Ldfld, InternalFabric.KeyValueStoreItemType.GetField("Metadata")); // IntPtr
il.EmitCall(OpCodes.Call, ReflectionHelpers.CastIntPtrToVoidPtr, null); // void*
il.Emit(OpCodes.Stloc_2);

il.Emit(OpCodes.Ldloc_2); // NativeTypes.FABRIC_KEY_VALUE_STORE_ITEM_METADATA*
il.Emit(OpCodes.Ldfld, InternalFabric.KeyValueStoreItemMetadataType.GetField("Key")); // IntPtr

il.Emit(OpCodes.Ldloc_2); // IntPtr, NativeTypes.FABRIC_KEY_VALUE_STORE_ITEM_METADATA*
il.Emit(OpCodes.Ldfld, InternalFabric.KeyValueStoreItemMetadataType.GetField("ValueSizeInBytes")); // IntPtr, int

il.Emit(OpCodes.Ldloc_2); // IntPtr, int, NativeTypes.FABRIC_KEY_VALUE_STORE_ITEM_METADATA*
il.Emit(OpCodes.Ldfld, InternalFabric.KeyValueStoreItemMetadataType.GetField("SequenceNumber")); // IntPtr, int, long

il.Emit(OpCodes.Ldloc_1); // IntPtr, int, long, NativeTypes.FABRIC_KEY_VALUE_STORE_ITEM*
il.Emit(OpCodes.Ldfld, InternalFabric.KeyValueStoreItemType.GetField("Value")); // IntPtr (char*), int, long, IntPtr (byte*)

The part above is just a small snippet responsible partially for reading the value. If you’re interested in more, here are over 200 lines of emit that brings the COM+ to the public surface. You don’t need to read it though. SewingMachine delivers a much nicer interface for it.

RawAccessorToKeyValueStoreReplica

RawAccessorToKeyValueStoreReplica is a new high level API provided by SewingMachine. It’s not as high level as the original KeyValueStoreReplica as it accepts IntPtr parameters, but still, it removes a lot of layers leaving the performance, serialization, memory management decisions to the end user of the library. You can use your own serializer, you can use stackalloc to allocate on the stack (if values are small) and much much more. This accessor is a foundation for another feature provided by SewingMachine, called KeyValueStatefulService, a new base class for your stateful services.

Summary

We saw how KeyValueStoreReplica is implemented. We took a look at the COM+ interface call sites. Finally, we observed how, by emitting IL, one can expose an internal interface, wrap it in a better abstraction and expose it to the caller. It’s time to take a look at the new stateful service.

 

Better Stateful Service Fabric

TL;DR

This is a follow up entry to Stateful Service Fabric that introduced all the kinds of statefulness you can out of the box from the Fabric SDK. It’s time to move on with providing a better stateful service. First, we need to define what better means.

KeyValueStoreReplica implementation details

The underlying actors’ persistence is based on KeyValueStoreReplica. This class provides a high level API for low level interop interfaces provided by Service Fabric. Yep, you heard it. The runtime of the Fabric is unmanaged and is exposed via COM+ interfaces. Then they are wrapped in a nice .NET classes just to be delivered to you. The question is how nice are these classes? Let’s take a look at the decompiled part of a method.


private void UpdateHelper(TransactionBase transactionBase, string key, byte[] value, long checkSequenceNumber)
{
using (PinCollection pin = new PinCollection())
{
IntPtr key1 = pin.AddBlittable((object) key);
Tuple<uint, IntPtr> nativeBytes = NativeTypes.ToNativeBytes(pin, value);
this.nativeStore.Update(transactionBase.NativeTransactionBase, key1, (int) nativeBytes.Item1, nativeBytes.Item2, checkSequenceNumber);
}
}

 

pinning + Interop + pointers = fun

As you can see above, when you pass a string key and a byte[] value a lot must be done to execute the update:

  1. value needs to be allocated every time

    as there’s no interface accepting Stream or ArraySegment<byte> that would allow you to reuse bytes used for allocations, you always need to ToArray the payload

  2. key and value are pinned for the time of executing update.

    Pinning, is nothing more or less than prohibiting object from being moved by GC. The same effect that you get when using fixed keyword or using GCHandle.Alloc. When handles not that many requests it’s ok to do it. When GC kicks in frequently, this might be a problem.

The interesting part is the nativeStore field that provides the COM+ seam to the Fabric internal interface. This is the interface that is closest to the Fabric surface and that allows to squeeze performance out of it.

Summary

You can probably see, when this leads us. Seeing that underneath the .NET wrapper there is a COM+ that has much more low level interface and allows to use raw memory, we can try to access it directly, skipping KeyValueStoreReplica altogether and write a custom implementation that will enable to maximize the performance.