Concurrency : Concurrent<T> part 1

Herb Sutter presented in C++ and Beyond 2012 a very useful concurrency pattern that is an improvement from the “active object“.

concurrent wraps any objecft and all calls to the object will be executed asynchronously and in FIFO order.

Basically it is an improved active object that can wrap any object. The improvement from the active object pattern is that the boiler plate code to get an object asynchronous is completely removed!  The asynchronous function call on the object will be forwarded from a wrapper object to the actual object through a message queue.

// "hello" will be copied into a temporary std::string
// that will the input to the concurrent
// wrapper
concurrent<std::string> async_string{"hello"};

// Calling it is done through a lambda. The call
// is asynchronous. The job is executed in a background thread
async_string( [](std::string s) { s.append("world"); } );

The API call is always through a lambda or a std::function.  

The lambda or std::function will be passed into a message queue. When it is called it will be called with the wrapped object as a reference parameter into the function.

This is very handy as you can access any public function of the wrapped object. Every call will be asynchronous and the calls will be executed in FIFO order in the background, just like if you had used an active object.

Improvement from the original

In Herbs’ example the object would be copied into the concurrent wrapper.

template<T> class concurrent {
...
public:
concurrent( T t_ = T{} ) : ...

I was not too keen on that approach since I would like to use the pattern for any object even if the copy constructor would not be accessible (unique_ptr anyone?)

A slightly different approach would be to take the input arguments for the wrapped type and just construct the object in place. Using variadic template arguments makes this very flexible.

template<T> class concurrent {
mutable T _worker;
...
public:
template<typename ... Args>
concurrent(Args&&... args) : _worker(std::forward<Args&>(args)...)
...

I.e. With the modified concurrent wrapper the arguments are used to create the worker object in-place. A copy of the object is not needed. Voila, we can now use the concurrent wrapper to put in objects that cannot be copied such as std::unique_ptr.   This will come in handy if you want to create “concurrent collections”. That is a topic for a soon to come  g3log related blog post.

Below you can see an example of this.

struct Animal { 
   virtual void sound() = 0;
};

struct Dog: public Animal{
   void sound() override {
      std::cout << "Wof Wof" << std::endl; 
   } 
};

struct Cat : public Animal { 
   void sound() override {
      std::cout << "Miauu Miauu" << std::endl; 
   }
};

TEST(TestOfConcurrent, CompilerCheckUniquePtrTest) {
   typedef std::unique_ptr<Animal> RaiiAnimal;
   concurrent<RaiiAnimal> animal1 {new Dog};
   concurrent<RaiiAnimal> animal2 {new Cat};

   auto make_sound = []( RaiiAnimal& animal ){ animal->sound(); };
   animal1(make_sound);
   animal2(make_sound);
}

Executing a lambda task

Below is outlined the code that is executed when a lambda task is added to be executed some time in the future. The lambda task will be executed when the scheduler pops the function-message from the queue and then calls it.

template<typename F>
auto operator()(F func) const ->std::future<decltype(func(_worker))> {    // 1
   auto p = std::make_shared<std::promise<decltype(func(_worker))>> ();    // 2
   auto future_result = p->get_future(); // 3
   _q.push([ = ]{                                         // 4
      try {
         concurrent_helper::set_value(*p, func, _worker); // 5
      } catch (...) {
         p->set_exception(std::current_exception()); } // 6
   });
   return future_result;
}

As you see above, it is quite an elegant and easy to follow piece of code. The return value (1) follows the syntax of auto return instead of left-side return. It is generally easier to read an auto function with trailing return compared to a left-side return when the return is a non-trivial template calculation.

// example
auto func() ->long_template_calculation { /* stuff */ }

   VS

long_template_calculation func()  { /* stuff */ }

The lambda task’s std::future return value is retrieved from a std::promise (2 + 3). The std::promise is within a shared_ptr. An internal lambda (4) is put on the message queue with access to the shared promise. The lambda contains the instructions on how to execute the operations. If everything goes OK with the execution of the function (4) then the return value is stored in the promise (5). If the lambda task throws during the execution then the exception is saved (6) instead.

The returned std::future contains the result of the task.

It is elegant solution. Kudos to Herb for sharing it: Channel 9.

Complete implementation

Working code example at  Coliru.  Complete source code at https://github.com/KjellKod/blog_sutter_concurrent

#pragma once
#include <shared_queue.hpp>   
#include <thread>
#include <future>
#include <functional>

namespace concurrent_helper {
   typedef std::function<void() > Callback;
   template<typename Fut, typename F, typename T>
   void set_value(std::promise<Fut>& p, F& f, T& t) {
      p.set_value(f(t));
   }

   template<typename F, typename T>
   void set_value(std::promise<void>& p, F& f, T& t) {
      f(t);
      p.set_value();
   }
} // concurrent_helper


template <class T> class concurrent {
   mutable T _worker;
   mutable shared_queue<concurrent_helper::Callback> _q;
   bool _done; 
   std::thread _thd;

   
public:
   template<typename ... Args>
   concurrent(Args&&... args)
   : _worker(std::forward<Args>(args)...)
   , _done(false)
   , _thd([ = ]{concurrent_helper::Callback call;
      while (!_done) {
         _q.wait_and_pop(call); call();
      }}) {
   }

   virtual ~concurrent() {
      _q.push([ = ]{_done = true;});
      _thd.join();
   }


   template<typename F>
   auto operator()(F f) const ->std::future<decltype(f(_worker))> {
      auto p = std::make_shared<std::promise<decltype(f(_worker))>> ();
      auto future_result = p->get_future();
      _q.push([ = ]{
         try {
            concurrent_helper::set_value(*p, f, _worker);
         } catch (...) {
            p->set_exception(std::current_exception()); }
      });
      return future_result;
   }
};

See the follow-up. Concurrent<T> part II : improvements

Advertisements

About kjellkod

Software Engineer by trade, (former?) Martial Artist and Champion by strong will and small skill... and a Swede by nationality :)
Video | This entry was posted in C++, coding, Software Engineering. Bookmark the permalink.

One Response to Concurrency : Concurrent<T> part 1

  1. Pingback: Concurrency : concurrent<T> (wrapper), part II | Kjellkod's Blog

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s