Await Now or Never

Intro

This post is a continuation of implementing a custom scheduler for your orchestrations. We saw that the Delay operation is either completed or results in a never ending task, that nobody ever completes. Could we make it easier and provide a better way for delaying operation?

Complete or not, here I come

The completed Delay operation was realized by


Task.CompletedTask

This is a static readonly instance of a task that is already completed. If you need to return a completed task, because the operation of your asynchronous method was done synchronously, this is the best way you can do it.

For cases where we don’t want continuations to be run, we used:


new TaskCompletionSource<object>().Task

which of course allocates both, the TaskCompletionSource instance and the underlying Task object. It’s not that much, but maybe, as there are only two states of the continuation: now or never, we could provide a smaller tool for this, that does not allocate.

Now OR Never

You probably know, that you might create your custom awaitable objects, that you don’t need to await on Tasks only. Let’s take a look at the following class


public sealed class NowOrNever : ICriticalNotifyCompletion
{
  public static readonly NowOrNever Never = new NowOrNever(false);
  public static readonly NowOrNever Now = new NowOrNever(true);

  NowOrNever(bool isCompleted)
  {
    IsCompleted = isCompleted;
  }

  public NowOrNever GetAwaiter()
  {
    return this;
  }

  public void GetResult() { }

  public bool IsCompleted { get; }

  public void OnCompleted(Action continuation) { }

  public void UnsafeOnCompleted(Action continuation) { }
}

This class is awaitable, as it provides three elements:

  1. IsCompleted – for checking whether it was finished (fast enough or synchronously to do not build whole machinery for an asynchronous dispatch)
  2. GetAwaiter – to obtain the awaiter that is used to create the asynchronous flow
  3. GetResult

Knowing what are these parts for, let’s take a look at different values provided by NowOrNever static fields

NowOrNever IsCompleted OnCompleted/ UnsafeOnCompleted
Now true no action
Never false no action

 

As you can see, the completion is never called at all. For the Never case, that’s what we meant. What about Now? Just take a look above. Whenever IsCompleted is true, no continuations are attached, and the code is executed synchronously. We don’t need to preserve continuations as there are none.

Summary

Writing a custom awaitable class is not a day-to-day activity. Sometimes there are cases where this could have its limited benefit. In this NowOrNever case, this allowed to skip the allocation of a single task, although, yes, the created async state machine takes probably much more that a single task instance.

Implementing a scheduler for your orchestrations

TL;DR

We’ve already seen here and here that with async-await one could easily sketch an orchestration/saga for any process that should be both, robust and resilient. It’s time to take a look how a scheduler for such a process could be implemented.

Delay with no Task

Usually, when we want to delay an action in an asynchronous flow, we use Task.Delay. This method schedules a continuation with the rest of our code, to be executed after the specified delay. The usage is as simple as:


await Task.Delay(TimeSpan.FromSeconds(1.5));

This is fine, when we want to postpone an action for a few seconds, but what in case of processes that want to be frozen for days? How could you implement it?

First, let us rephrase the delay to a method that is provided by the base orchestration class (you can always have a base, can’t you?).


await this.Delay(TimeSpan.FromSeconds(1.5));

With this assumption, we can move forward and take a look at a possible Delay implementation.

Delay for Orchestrations

The whole idea of this orchestration is based on snapshoting its changes as events and make them replayable. In other words, if a failure occurs, the orchestration process should be resurrected on another node with no changes in the flow. This makes implementation a bit trickier, but is needed for providing strong foundations for our processes. Let’s take a look at Delay possible implementation.


protected Task Delay(TimeSpan delay)
{
  var date = GetDateTimeUtcNow();
  var scheduleAt = date + delay;

  ScheduledAt existingDelay;
  if (TryPop(out existingDelay))
  {
    if (existingDelay.Value > context.DateTimeUtcNow())
    {
      EndCurrentExecution();
      return new TaskCompletionSource<object>().Task;
    }

    return Task.CompletedTask;
  }

  if (scheduleAt <= date)
  {
    return Task.CompletedTask;
  }

  Append(new ScheduledAt(scheduleAt));
  EndCurrentExecution();
  return new TaskCompletionSource<object>().Task;
}

The first line of this methods calls to GetDateTimeUtcNow. As you can imagine, this gets the UTC current date. It has one additional property though. Do you remember that we need to make this method possible to be executed multiple times with the same effect? This means, that the result of GetDateTimeUtcNow will be recorded and when we, for any reason like the process kill, enter the orchestration again, it will provide the same value. Effectively, it will be now from the first execution of it.

The next step is to calculate the date when the delay should end, where the next execution is ScheduledAt.

We TryPop a prerecorded event. If the orchestration was already active, it left a trace, an event in the history, that we can pop. If there’s an entry, we compare it with the current UtcDateTimeNow. If the orchestrations should wait more we just mark it as one that requires ending execution. Next we return  new TaskCompletionSource<object>().Task
which effectively is a never ending task. This means that any continuation attached by the caller of this method, either explicit or implicit using await won’t be run!

If there was no event and the date that Delay is scheduled for some reason lower then current date, a completed task is returned. Otherwise, an event is added and the current execution is ended with the same pattern: by setting a notification about execution not proceeding any longer and returning a never completing task.

Execution status

The caller responsibility is to gather information whether the orchestration ended or was scheduled for later execution. This is done by awaiting one of the tasks. Either the task of the orchestration itself or a task that is EndCurrentExecution sets the result.


await Task.WhenAny(
orchestration.Execute(),
currentExecutionEndingTask);

Summary

We saw how powerful can be asynchronous flows, especially when connected with optional calling of scheduled continuations. With a simple recording of events we were able to create an orchestration tooling that is easy to use by the end user (programmer), but still provides an interesting and powerful semantics of a time dependent process.

Orchestrating processes for fun and profit

TL;DR

I’ve already shown here that with some trickery you can write orchestrations in C# based on async-await. It’s time to revisit this idea, now with a custom orchestration of mine.

Show me the code!

The orchestration is based on the event sourcing infrastructure built by me. This project is not public (yet) but it’s similar to any other ES library. The main idea behind building an orchestration on top of it is that state changes of an orchestration are easily captured as events like: GuidGenerated, CallRecorded, UtcDateTimeObtained, etc etc. If this is the case, we could model any orchestration as an event-sourced aggregate.

Here it is an example of reserving a trip, that needs to coordinate between reserving a hotel and a flight. Additionally a few calls were added to just show what the orchestration is capable of

orchestration.png

  • Line 19: orchestration can delay it’s execution. It does not mean that it will stay in memory for that long. It will record the need of delay to be woken up when the time comes
  • Line 22: when a Guid is generated with Orchestration.NewGuid it means that it will have the same value when the orchestration is executed again. Process dies and the orchestration is executed elsewhere? Don’t you worry, the guid value has been recorded
  • Line 25: same with dates
  • Line 28: an external call to any system. Here, we try to ReserveHotel. 3 attempts are made with a exponential back off. Between calls, if the configured timespan is long enough, saga can be swapped from memory (like with Delay)
  • Line 33: same as with hotel
  • Line 38: the compensation part

Execute it!

The execution would take the persisted events and replay them if process failed, machine died, etc. on another host. Because before every external call, all the recorded events are persisted, it ensures that even during a rerun, all the values will be exactly the same and calls that were already made won’t be made again.

Summary

Orchestrating with async-await trickery is much easier and can be written in a dense, simple way of a regular method call. Having an event-sourced foundation makes it even easier as you can use already existing pieces and persist all orchestration events as event of some kind of aggregate.

Async programming model

TL;DR

This is a follow up post to Async pump for better throughput in Azure. Please read the first before moving forward.

Feedback

I’ve been given a lot of feedback about my Async pump post. In a few cases this blog post from Ayende was quoted as it describes exactly the same approach. You can read the post, but more meaningful are comments provided by Kelly Sommers and Clemens Vasters.

The model

The await statement has simple semantics. It breaks your code and schedules the following part as a task continuation. This heavy lifting is done on the C# compiler level, so you don’t have to worry about. The model of this extension is simple: define a task and a continuation. Nothing more, nothing less. With my approach, that was a “trick”

That’s the premise of the “trick” that is allegedly achieving parallel execution of I/O and compute work here. That is, however, not the purpose of the asynchronous programming model and of the Windows IO completion port (IOCP) model. The point of IOCP is to efficiently offload IO work from user code to kernel and driver and hardware and not to bother the user code until the IO work is done

by Clemens Vasters

What it basically says is once you await on IO operation, your code that is run after, is scheduled on the IO thread

As IO typically takes very long and compute work is comparatively cheap, the goal of the IO system is to keep the thread count low (ideally one per core) and schedule all callbacks (and thus execution of interleaved user code) on that one thread. That means that, ideally, all work gets serialized and there minimal context switching as the OS scheduler owns the thread. The point of this model is ALREADY to keep the CPU nicely busy with parallel work while there is pending IO.

by Clemens Vasters

So with your code awaiting some IO, you’ll call IO, next the code after await is executed on the IO thread, as it’s assumed to be lightweight, another IO occurs, again the same thread dispatched the callback. With the async pump I proposed, comes a danger, as when we follow this partially awaitless approach the continuation code is

not on a .NET poll thread, but an IO pool thread. The strategy above sits on that IO pool thread past the point where you are supposed to return it (which is the next IO call) and thus you might force the IO pool to grow

by Clemens Vasters

Summary

Measure first. Think. Then think again. The “trick” worked in my case, improving performance for initialization of one app (I needed it in the beginning). Does it work in every case and should be used in general, I’d say no. It’s good to follow the programming model. If you don’t want to, you must have strong reasons for it.

Async pump for better throughput in Azure

This post is followed up by 
https://blog.scooletz.com/2017/02/20/async-programming-model

TL;DR

Introducing async-await has changed a lot. Now, with some compiler’s help we’re able to squeeze out more throughput from our machines, which may lower costs and increase throughput. In this blog post we’ll push the boundaries even further by questioning the need of immediate awaiting on a task.

Background

The story behind this pattern is simple. I’m using a part of the Azure Storage Services, the page blob. It provides storage API targeted at random IO & page aligned reads and writes. This is a perfect solution for emulating disk IO (it’s used for VMs’ disks), but could be used in cases where you want to have ability to write to a file in Azure, under specific index. You can just read a page, modify it and write it back. If you’re interested in this topic, take a look at my talks, maybe we’ll meet during my presentation Keep Its Storage Simple Stupid.

Synchronous

It’s obvious that for reading/writing from Storage Services you want to use asyncified code. Using blocking calls like the one below freezes a thread, which considering the money you already pay, is not the best option. Remember, it’s the cloud and regular Storage Services are backed up by HDD disks. It might take a while. Still, let’s take a look at the sync version first. We’ll operate on a stream that has been opened from the blob.

pump.png

The method above reads the buffer from a stream. It dispatches a read after a read to fill the buffer.

Asynchronous

The async-ified version of this reader is not that different. It uses just async Task in its signature and awaits one the Read. We’ll have no blocking calls, leaving some spare CPU cycles for other operations.

pump

Asynchronous pump

In the last attempt, we need to ask what do we read this buffer for? In my case, that’s for scanning over its content. I need to read a page blob from a given position and scan/deserialize it in a C# code. I do not want to preserve the buffer. As soon as I read it, I can move on. It’s just about reading a log, nothing more. The second property of it is: entries in this log are aligned to pages, so are well aligned for reading. Can we modify the reading part then?

We could think of using two buffers/streams. Schedule a read in the first, then in a loop:

  1. schedule a read in the second
  2. await on the first
  3. swap firstsecond
  4. go to 1

If we used this algorithm, we’d have a higher probability of one operation being already ended and ready to be dispatched. This means that our algorithm, possibly, could work on prefetched data without any interruptions, having the data ready when it needs it. For sake of simplicity, the buffer array, ReadBuffer were closed in a simple helper class called Buffer.

pump.png

Summary

Having something ready to be awaited does not mean that you should await it immediately. Using this two buffer approach can increase the throughput of your algorithm by ensuring that data are fetched before they are needed. Now it’s time for you to search for some pumping potential in your code!

Concurrent conditional deletes

TL;DR

Sometimes System.Collections.Concurrent provide not enough methods to squeeze the top performance out of them. Below you can find a small extension method that will make your life easier in some cases

Concurrent dictionary

The concurrent dictionary provides a lot of useful methods. You can TryAdd, AddOrUpdate. Additionally, you can use TryUpdate which provides a very nice method for ensuring optimistic concurrency. Let’s take a look at its signature:

public bool TryUpdate(TKey key, TValue newValue, TValue comparisonValue)

It enables to replace a value under a specific key with the newValue only if the previous value is equal to comparisonValue. It’s an extremely powerful API. If you create a new object for a specific key to update it, it enables to replace that object without locks with a single method call. Of course, if the method fails, it returns false and it’s up to you to retry.

What about deletes? What if I wanted to remove an entry only if nobody changed it’s value in the meantime? What if I wanted to have an optimistic concurrency for deletes as well. Is there a method for it? Unfortunately no. The only method for removal is

public bool TryRemove(TKey key, out TValue value)

which removes the value unconditionally returning it. This breaks the optimistic concurrency as we can’t ensure that the removed entry wasn’t modified. What can be done to make it conditional?

Explicit interfaces

The ConcurrentDictionary class implements a lot of interfaces, one of them is

ICollection<KeyValuePair<TKey, TValue>>

This interface has one particular method, enabling to remove a pair of values.

ICollection<KeyValuePair<TKey, TValue>>.Remove(KeyValuePair<TKey, TValue> kvp

If you take a look into implementation, it uses a private method of the dictionary to remove the key only if the value is equal to the value of the pair. Now we can write a simple extension method to provide a conditional, optimistically concurrent removal of a key

static class ConcurrentDictionaryExtensions
{
    public static bool TryRemoveConditionally<TKey, TValue>(
        this ConcurrentDictionary<TKey, TValue> dictionary, TKey key, TValue previousValueToCompare)
    {
        var collection = (ICollection<KeyValuePair<TKey, TValue>>)dictionary;
        var toRemove = new KeyValuePair<TKey, TValue>(key, previousValueToCompare);
        return collection.Remove(toRemove);
    }
}

which closes the gap and makes the API of the concurrent dictionary support all the operations under optimistic concurrency.

Summary

With this simple tweak you can use a concurrent dictionary as a collection that supports fully an optimistic concurrency.

Concurrency – ramp up your skills

Yesterday, I gave my Extreme Concurrency talk at rg-dev user group. After the talk I had some really interesting discussions and was asked to provide some resources in the low level concurrency I was talking about. So here’s the list of books, talks and blog posts that can help you to ramp up your skills

Videos:

  1. [C++] Herb Sutter “atomic Weapons” – it’s about C++ but covers memory models in a way, that’s easy to follow and learn how it works
    1. Part 1
    2. Part 2

MSDN:

  1. .NET Volatile class – it has a good description of what half-barriers are and properly shows two counterparts Read & Write methods
  2. .NET Interlocked class – the other class with a good description providing methods that are executed atomically. Basically, these methods are JITted as single assembler operations.

Code:

  1. RampUp – a project of mine 🙂
  2. [JAVA] Aeron – the messaging library

Books:

  1. Concurrent programming on Windows by Joe Duffy – this is a hard book to go through. It’s demanding and requires a lot of effort but is the best book if you want to really understand this topic

Blogs:

  1. Volatile reads and writes by Joe Duffy
  2. Sayonara volatile by Joe Duffy
  3. Atomicity, volatility and immutability are different by Eric Lippert – that’s the last part of this series
  4. [JAVA] Psychosomatic, lobotomy, saw – the name is strange but you won’t find here disturbing videos. What you’ll find though, is a deep-dive into memory models.