C++11: Future Chaining For Easy, Highly Threaded Execution

Highly multi-threaded is here, now and cannot be ignored!
Copyright Dr Alexander J Turner.
Mutli-threaded with pure functions should be easy. With C++11 it is starting to be. Here I show using pure Lambda expressions, type inference and nested futures to produce a highly multi-threaded system.

Having read a post at Stutter's Mill I got thinking about method chaining in C++. I am very keen on method chaining in Java and so was pleased to see the future().then() pattern turning up in C++11. This got me thinking about chaining futures x.y.z etc. The idea being that each task (x,y and z) could be run on a different thread according to the load of the machine. It is a bit like thinking of the tasks as very simple actors. 

I figured I could write this idea up in an hour or so before work and set off coding. What I came up with looks nothing like what I had in mind, but it really interests me and is actually rather simpler in a lot of ways ( it gets rid of memory management challenges caused by passing data between the tasks).

Before Continuing We Need To Think About Threads A little: 
The key to understanding what is going off with this code it to realise the difference between hardware threads and software threads. I developed this code on a twin socket quad core xeon machine. That give me 8 true hardware threads. My code can have very many more software threads. However, at any one instant of time the machine can only be doing 8 things. 

The M software threads and N hardware threads where M>N sounds like a limitation but it is actually the opposite. It allows us to nest futures. 

x(do_stuff y( do_stuff z(do_stuff).get()).get()).get()

This model means that the thread running y will block waiting for z. Consequently x will block waiting for y and z. But this does matter, because the blocked threads will consume no CPU resource and just a little bit of house keeping resource in the program and kernel. 

A typical async calling a lambda where all type information in inferred:

              async
              (
                [](long i)
                {
                  return i-=3,wait_long_time(),
                  i;
                },
                i
              ).get();


I don't mind when or on what core/socket the computer runs x,y and z as long as you perform then in the order I specified: which is do X's stuff, then Y's and Z's. We leave all the details the the implementation. We don't need to think about thread pooling, task queuing or anything. All the hard stuff is taken care of for us.

Where We Get The Real Advantage:
So far we are using threads and tasks to mimic what a single thread of execution would do. The real benefit comes when we have multiple task-chains running at the same time. I am calling x-y-z a task chain. It has to occur in order, but that is the only limitation. 

BUT: modern systems have prementive multi-tasking and so we don't need these chains to achieve load spreading as I discussed. (see my second BUT below for the counter argument).

So What Use Is It Really:
The code below is a curiosity. It does a nice job of showing how software and hardware threads interact. More importantly, it is a complete separation of threads allocation and task allocation with a very light weight standard C++11 syntax and no third party library! I am sure this approach as potential uses in creating light weight threading models for C++11 applications.

void wait_long_time(){ // Code to much up 1 second of CPU time auto t1 = Clock::now(); while(true){ auto t2 = Clock::now(); auto tt = (t2-t1); std::chrono::system_clock::period p; Milliseconds res = Clock::duration(1); if(tt.count()*res.count()>1000)break; } }

BUT: Stop, there is a real world benefit to this approach even as it stands! I am not really making full use of this here, however, async does not just go off and create a new thread. It can alternatively use a pooled thread or use the caller's thread. This is down to the implementation. So, my approach tasks advantage of infrastructure of the C++11 implementation without having to explicitly interact with it. As such, it should represent a much more stable approach than relying of the premptative threading of the OS to make our stuff work.

In the example below I spin the CPU for 1 second in the third task using the chrono package. This causes the code to execute in effective batches of 8 (over my 8 cores). To see this effect you can view the image at the bottom of this post (which is animated).

#include "stdafx.h"
#include <future>
#include <iostream>
#include <functional>
#include <thread>
#include <chrono>
using namespace std;

namespace nerds_central{
#define N_TASKS 64
  typedef std::chrono::high_resolution_clock Clock;
  typedef std::chrono::duration<double, std::milli> Milliseconds;

  void wait_long_time(){
    auto t1 = Clock::now();
    while(true){
      auto t2 = Clock::now();
      auto tt = (t2-t1);
      std::chrono::system_clock::period p;
      Milliseconds res  = Clock::duration(1);
      if(tt.count()*res.count()>1000)break;
    }
  }

  void run(){
    future<long> as[N_TASKS];
    for(long i=0;i<N_TASKS;++i){
      as[i] =async
      (
        [](long i)
        {
          return ++i,
          async
          (
            [](long i)
            {
              return i*=2,
              async
              (
                [](long i)
                {
                  return i-=3,wait_long_time(),
                  i;
                },
                i
              ).get();
            },
            i
          ).get();
        },
        i
      );
    }
    for(long i=0;i<N_TASKS;++i){
      cout << "Answer for seed " << i << " is " << as[i].get() << endl;
    }
  }

  void timeRun(long count,std::function<void()> toRun){
    for(;count>0;--count){
      auto t1 = Clock::now();
      toRun();
      auto t2 = Clock::now();
      auto tt = (t2-t1);
      std::chrono::system_clock::period p;
      Milliseconds res  = Clock::duration(1);
      std::cout << tt.count()*res.count() << '\n';
    }
  }
}

int _tmain(int argc, _TCHAR* argv[])
{
  nerds_central::timeRun(1,nerds_central::run);
  return 0;
}

The Output:
In the animation below we can see the print out appearing in blocks of 8 as the tasks are spread over the 8 cores: