Concurrency : concurrent<T> part 2

concurrent<T> part I described a powerful wrapper that made all calls to the object to be executed asynchronously and in FIFO order. Using a lambda call made it easy to bundle several actions on the concurrent<object> in one asynchronous operation.

This is great news for coders who wants a foolproof background threaded object that manages well initialization, shutdown and with a thread-safe call API. No more start-up races or weird shutdown behaviour!

Sutter’s concurrent<T> is a powerful pattern. With some small changes and g3log tech import, we can boost it to be even better. Let’s make it foolproof to use, easier to call and with the power to handle derived types and not only concrete types.

Let’s take a step towards making concurrency as easy as we want it to be. At the same time let’s take a look at how std::packaged_task, std::future and std::exception_ptr are glued together to make this work great as well as being coder-user-friendly.

The finished and improved concurrent<T> can be found at:
github.com/KjellKod/Concurrent

Simplified and Safer Calling

Bundling several operations on the wrapped object and executing them in one asynchronous action can be done through the lambda call shown in part I. For a single call it makes more sense to avoid the verbose lambda and go for a direct call API.

struct Greeting {
 std::string sayHello() { return {"Hello"}; }
};

concurrent<Greeting> gossip;
/*(1) as shown in Concurrent<T>, part I */
auto greet1 = gossip([] (Greeting& obj) { return obj.sayHello(); }); 

/*(2) kjellkod way*/
auto greet2 = gossip.call(&Greeting::sayHello);

PRO: By using the direct call (2) approach it is safer and simpler to make the asynchronous call.

auto greet2 = gossip.call(&Greeting::sayHello);

This is an easier syntax than the lambda call. With (2) you can only use the public API of T as it was intended. This makes it less error prone to use and you do not have to fear that the call act outside its thread context (i.e lambda expression and [&] capture).

CON: The function pointer syntax cannot easily distinguish between overloaded functions. You must specify exactly the function signature. An example is to call append on a concurrent<std::string>. This will be tricky since the std::string::append has several overloads

concurrent<std::string> cs;
cs.call(&std::string::append, "Hello World"); // won't compile

To use the call API in an overload situation you must specify the function signature exactly right. This is easiest to achieve through a typedef. The exact function signature that we are interested in is:

std::string& append(const std::string& str)

Making it happen through a typedef:

concurrent<std::string> cs;

// 1) the function-name is typedef'ed to "append_func"
// 2) the function signature can be read as:
//    returning a "std::string&"  and taking a "const std::string& 
//    as parameter"
typedef std::string& (std::string::*append_func)(const std::string&);
append_func appender = &std::string::append;

auto response = cs.call(appender, "Hello World");


Controlled life-span using std::unique_ptr<concurrent<T>>

Moving the concurrent<T> into a std::unique_ptr can be helpful for controlling its life-span. Using the lambda API would then look something like this:

std::unique_ptr<concurrent<Greeting>> gossip { /* stuff */ };
auto greet = gossip->operator()([] (Greeting& obj) { /* ... */ });

Unfortunately the function call operator() looks alien when used together with a pointer.

gossip->operator()([]....)

Since we used ->call() for the direct call API maybe ->lambda() would work here. At least lambda is kind of self explanatory. If you don’t like this then just replace it for
operator()(/*...*/).

I will let you be the judge of which one to choose:

auto greet1 = gossip->operator()([] (Greeting& obj) { ... });
//    VS
auto greet2 = gossip->lambda([] (Greeting& obj) { ... });

As mentioned earlier you can also skip the lambda and just use the direct call API

auto greet = gossip->call(&Greeting::/*some function*);


Abstract and Derived Types

The original concurrent<T> could not easily handle abstract types. such as

struct Animal { 
  virtual std::string sound() = 0; 
};

struct Dog : public Animal { 
  std::string sound() override { return {"Wof Wof"}; } 
};

By internally using a (unique) pointer instead of a concrete object we can deal with polymorphism. We will keep the same initialization API as before with the addition that you can also pass in a already constructed unique_ptr.

concurrent<Animal> animal1{std::unique_ptr<Animal>(new Dog)}; 
EXPECT_EQ("Wof Wof", animal1.call(&Animal::sound);

If you would be so silly as to put in a std::unique_ptr containing a nullptr then the nullptr object will not be called. Instead an exception will be stored in the returned std::future.

How the asynchronous scheduling works

Let’s recap some that was shown only in code earlier. The concurrent<T> wrapper has a condition variable empowered message queue (cudos to Anthony Williams). When the queue is empty the concurrent<T>‘s thread sleeps. When a job is added to the queue the thread wakes up, pops it and executes the job.

concurrent(std::unique_ptr<T> worker) 
 : _worker(std::move(worker))
 , _done(false)
 // the std::thread _thd runs the job scheduling loop
 , _thd([ = ]{
        Callback call;
        while (_worker && !_done) {
          _q.wait_and_pop(call);  
          call(); // the  lambda or direct call function
        }         // executes
    }) 
...


Executing a function call

When the a concurrent call is executed, such as shown below, the call will be packaged and later executed by the background scheduler.

struct Hello { void foo(){...} };
concurrent<Hello>  h;
std::future<void> result = h.call(&Hello::foo);

The somewhat simplified code below shows what happens when doing a concurrent<T>::call.

template<typename Call, typename... Args>
auto call(Call call, Args&&... args) const -> std::future<ReturnedType>  {/*1*/

 /*2*/
   typedef typename std::result_of<decltype(call)(T*, Args...)>::type result_type;
   typedef std::packaged_task<result_type()> task_type;  


   if (empty()) {  /*3*/                                        
      auto p = std::make_shared<std::promise<result_type>>();
      std::future<result_type> future_result = p->get_future();
      p->set_exception(std::make_exception_ptr(std::runtime_error
                                          ("nullptr instantiated worker")));
      return future_result;
   }

   /* 4 */
   auto bgCall = std::bind(call, _worker.get(), std::forward<Args>(args)...);  
   task_type task(std::move(bgCall));    /*5*/ 
   std::future<result_type> result = task.get_future();
   _q.push(MoveOnCopy<task_type>(std::move(task)));  /*6*/
   return std::move(result); /* 7 */
}

1. std::future<ReturnedType>

The real line for std::future<ReturnedType> could not easily fit on the page. ReturnedType stands for:

typename std::result_of< decltype(func)(T*, Args...)>::type

This template expression will set the return type inside the ::type. As far as I know it is the most straight forward way to retrieve the return type from a member function. Other ways to get this information are discussed at stackoverflow.

The retrieval of a member function’s return type is described in greated detail in a previous blog post. You can also see a code example for it here [ideone][coliru]

2. Packaged asynchronous operation

A packaged_task will wrap our object with the given function. The task is a function object that will put the result of its operation in a std::future. If an exception is thrown during the operation, that too will be saved into the std::future.

3. Check for empty

In case the inner std::unique_ptr only contains a nullptr then an appropriate exception is set through the exception_ptr in the returned std::future

4a. Preparing the task for asynchronous execution

The task (2) wraps our internal object and the given function call. It is put onto the task queue. The background thread will pop the task and execute it. No special handling for exceptions are needed. The task handles all that internally.

4b. std::bind and not lambda for parameter pack

Due to weak compiler support for expanding parameter packs in a lambda we have to use a workaround. The best way to manage the parameter pack is to fall back on the trustworthy std::function.

auto bgCall = 
     std::bind(call, _worker.get(), std::forward<Args>(args)...);  //workaround
   task_type task(std::move(bgCall));    
   std::future<result_type> result = task.get_future();
   _q.push(MoveOnCopy<task_type>(std::move(task)));

With better compiler support the “workaround” line could instead be written

auto bgCall = [&, args...]{ return (_worker.*call)(args...); };





That’s it…

With the improved concurrent<T> you can use the safer, single call, API or you can use the lambda API and bundle several operations together in one asynchronous action.

Enjoy: github/KjellKod/Concurrent

What’s next?

Currently I have no plans to extend this further but it is still tempting to play with two new features.

Are any of them of special interest to you? Don’t hesitate to give a comment with a +1 for your preferred add-on.

  1. thread loop: From here it would be fairly simple to have an optional thread “loop”. A function that is being continuously scheduled to run. Adding this and all of a sudden you have a continuous worker thread that can work at its specific task as well as being responsive for API calls. Initialization and shutdown logic is a breeze since it is all handled by the concurrent<T>

  2. asynchronous signal-and-slot communication: Possibly overkill but something I have wanted to write for years. A signal-and-slot communication mechanism that is :

    • Scope thread-safe: Objects that goes out of scope cannot be called. The power of reference counting and weak-pointers makes this less hard than it sounds. There are a number of signal-and-slots libraries that implement this already.

    • Tread to thread calling: Making it foolproof to use between threads. It must be impossible to send a signal that executes in the same threads context (i.e the wrong thread context) on another object. Similar to Qt’s signal concept there should be a dispatcher that takes the signal and executed them in the receiving threads scope. Last time I checked only Qt’s signal-and-slot library had this essential feature.
    • .

If you think about it, there is just a little bit of glue needed to make 1) work with concurrent<T>. The thread-dispatching part of the signal-and-slot library 2) can also be handled by concurrent<T> but obviously writing that library would involve more effort.

About kjellkod

Software Engineer by trade, (former?) Martial Artist and Champion by strong will and small skill... and a Swede by nationality :)
This entry was posted in C++, concurrency, Software Engineering. Bookmark the permalink.

2 Responses to Concurrency : concurrent<T> part 2

  1. Pingback: Concurrency : Concurrent<T> (wrapper), part 1 | Kjellkod's Blog

  2. +1 For advertising direct member calls as well. Lambdas are getting overused lately.

    About “What’s next”. Having used something similar to this, cancellation is very important and completely changes the game. Cancellation of active task, queued tasks, and if it is a global queue – cancellation for tasks for a specific user (submitter).
    If you mix cancellation with asynchronous cross-thread communication, things start to get pretty non-trivial because one must ensure posted ready_work must not reach the user if the task was canceled. Suddenly “work in progress” is not just a flag on the thread, but all pending result(s) from that work in the current thread as well.

Leave a comment