TSM - Editoria 17

Ovidiu Mățan - Fondator @ Today Software Magazine

In the previous examples we discussed ways to protect the data shared between multiple threads. Sometimes it is not enough just to protect shared data, but it is also necessary to synchronize the operations executed by different threads. As a rule one wants a thread to wait until an event occurs or until a condition becomes true. To this end, C + + Standard Library provides primitives such as condition variables and futures.

In C++ ١١ Standard, condition variables have not one but two implementations: std::condition_variable and std::condition_variable_any. Both implementations can be used by including the header . To facilitate the communication between threads, condition variables are usually associated with a mutex, for std::condition_variable or any other mechanism that provides mutual exclusion, for std::condition_variable_any.

The thread waiting for a conditional variable to become true should firstly lock a mutex using std::unique_lock primitive, the necessity of which we shall see later. The mutex is atomically unlocked when the thread starts to wait for the condition variable to become true. When a notification is received relative to the condition variable the thread is waiting for, the thread is restarted and blocks again the mutex.

A practical example may be a buffer that is used to transmit data between two threads:

std::mutex mutex;
std::queue buffer;              
std::condition_variable buffer_cond;

void data_preparation_thread()
{
    while(has_data_to_prepare())                //--  (1)    
    {
      buffer_data data = prepare_data();
      std::lock_quard lock(mutex);  //--  (2) 
      buffer.push(data);                        
      buffer_cond.notify_one();                 //-- (3)  
    }
}

void data_processing_thread()
{
    while(true)
    {
      std::unique_lock lock(mutex);              //-- (4)  
      buffer_cond.wait(lock, []{return ! buffer.empty()})    //-- (5)   
      buffer_data data = buffer.front();
      buffer.pop();
      lock.unlock();                                         //-- (6)   
      process(data);
      if(is_last_data_entry(data)) 
          break;         
    }  
}

When data is ready for processing (1) the thread preparing the data locks the mutex (2) in order to protect the buffer when it adds the new values. Then it calls the notify_one ( ) method on the buffer_cond condition variable (3) to notify the thread waiting for data (if any) that the buffer contains data that can be processed.

The thread that processes the data from the buffer firstly locks the mutex, but this time using a std::unique_lock (4). The thread then calls the wait ( ) method on the buff_cond variable condition, sending to it as parameters the lock object and a lambda function that is the condition for which the thread waits. Lambda functions are another specific feature of C + +11 standard enabling anonymous functions to be part of other expressions. In this case the lambda function []{return ! buffer.empty()} is written inline in the source code and it verifies if there is data that can be processed in the buffer. The wait ( ) method then checks if the condition is true (by calling the lambda function that was passed) and returns the result. If the condition is not fulfilled (the lambda function returns false), then the wait function unlocks the mutex and puts the thread on lock or standby. When the condition variable is notified by calling the notify_one ( ) function of from data_preparetion_thread ( ), the thread processing the data is unlocked, it locks again the mutex and checks again the condition leaving the method wait ( ) with the mutex still locked if the condition is fulfilled. If the condition is not met, the thread unlocks the mutex and waits again. This is why one uses std::unique_lock because the thread that processes the data must unlock the mutex while waiting and then it must lock it again. In this case std::lock_guard doesn"t provide this flexibility. If the mutex remained locked while the thread waiting for data to be processed is blocked, then the thread that prepares the data could not lock the mutex in order to insert the new values into the buffer, and the thread that processes the data would never have the condition met.

Flexibility to unlock a std::unique_lock object is not only used in calling the wait ( ) method, but it is also used when the data is ready for processing but before being processed (6). This happens because the buffer is only used to transfer data from one thread to another and in this case one should not lock the mutex during data processing, because it could be a time consuming operation.

Futures

Another synchronization mechanism is a future, i.e. an asynchronous return object (an object that reads the result of a condition/setting common to many threads) implemented in C++11 Standard Library through two template classes declared in the header < futures >:unique futures (std::future < >) and in shared futures (std::shared_future < >) , both modeled after std::unique_ptr and std::shared_ptr mechanisms.

For example, suppose we have an operation that performs a very time consuming calculation and the result of the operation is not necessary immediately. In this case we can start a new thread to perform the operation in the background but this implies that we need the result to be transferred back to the method in which the thread was released, because the object std::thread does not include a mechanism for this situation. Here comes the template function std::async, also included in the header.

A std::async object is used to launch an asynchronous operation whose result is not immediately necessary. Instead of waiting for a std::thread object to complete its execution by providing the result of the operation, the std::async function returns a std::future that can encapsulate the operation result. When the result is necessary, one can call the get ( ) method on the std::future ( ) object and the thread is blocked until the future object is ready, meaning it can provide the result of the operation. For example:

#include 
#include 

int  long_time_computation();
void do_other_stuff();

int main()
{
   std::future the_result = std::async(long_time_computation);

   do_other_stuff();
 
   std::cout << "The result is " << the_result.get() << std::endl;
} 

A std::async object is a high-level utility which provides an asynchronous result and which deals internally with creating an asynchronous provider and prepares the common data when the operation ends. This can be emulated by a std::package_task object (or std::bind and std::promise) and by a std::thread, but using a std::async object is safer and easier.

Packages

A std::package object connects a function and a callable object. When the std::package <> object is called, this calls in turn the associated function or the callable object and prepares the future object in ready state, with the value returned by the performed operation as associated value. This mechanism can be used for example when it is necessary that each operation is executed by a separate thread or sequentially ran on a thread in the background. If a large operation can be divided into several sub-operations, each of these can be mapped into a std::package_task <>instance, which will be returned to operations manager. Thus the details of the operation are being abstracted and the manager operates only with std::package_task <> instances of individual functions. For example:

#include 
#include 
int execute(int x, int y) { return std::pow(x,y); }

void main()
{
    std::packaged_task task(std::bind(execute, 2, 10));
    std::future result = task.get_future();     //-- (1)   
 
    task();  //-- (2)
 
    std::cout << "task_bind:	" << result.get() << "
"; //-- (4)
}

When the std::packaged_task object is called (2) the execute function associated with it is called by default, to which parameters 2 and 10 will be passed and the result of the operation will be asynchronously saved in the std::future object (1). Thus, it is possible to encapsulate an operation in a std::package_task and obtain the object std::future which contains the result of the operation before the std::package_task object is called. When the result of the operation is necessary, it can be obtained when the std::future object is in the ready state (3).

Promises

As we could see in the Futures section, sending data between threads can be done by sending them as parameters to the function of the thread and the result can be obtained by returning arguments by reference, using the async() method.

Another transmission mechanism of the data resulting from the operations performed by different threads is to use a std::promise/std::future. A std::promise object provides a mechanism in order to set a type T value, which then can be read by a std::future object. While a std::future object allows accessing the result data (using get () method), the promise object is responsible for providing the data (using one of the set_ ... () methods). For example:

#include 
#include 

void execute(std::promise& promise) 
{
   std::string str("processed data"); 
   promise.set_value(std::move(str));	//-- (3)   
}

void main()
{
    std::promise promise; //-- (1)   
    std::thread thread(execute, std::ref(promise)); //-- (2)
    std::future result(promise.get_future()); //-- (4)
    std::cout << "result: " << result.get() << std::endl; //-- (5)
}

After including the header where the std::promise objects are declared, a specialized promise object is declared for the value it must preserve, std::string (1). The std::promise object creates a shared state internally, which is used to save the value corresponding to the type std::string, and which is being used by the std::future object to obtain this value, as a result of the operation of the thread.

This promise is then passed as a parameter to the function of a separate thread (2). The moment that, inside the thread the value of the promise object is set (3), the shared state becomes, by default, ready. In order to get the value set in the execute function, it is necessary to use a std::future object that shares the same state with the std::promise object (4). Once created the future object, its value can be obtained by calling get() method (5). It is important to note that the current thread (main thread) remains blocked until the shared state is ready (when the executed set_value method is executed (3)), meaning the data is available.

The usage of such objects as std::promise is not exclusively particular to multithreading programming. They can be used also in applications with a single thread, in order to keep a value or an exception to be processed later through a std::future.

Atomics

In addition to the mutual exclusion mechanisms above, the C++11 Standard introduces also the atomic types.

An atomic type std::atomic can be used with any T type and ensures that any operation involving the std::atomic object will be atomic, that is it will be executed entirety or not at all.

One of the advantages of using atomic types for mutual exclusion is performance, because in this case a lock -free technique is used, which is much more economical than using a mutex which can be relatively expensive in terms of resources and latency due to mutual exclusion.

The main operations provided by the std::atomic class are the store and load functions, which set and return atomic values stored in the std::atomic object. Another method specific to these objects is the exchange function, which sets a new value for the atomic object while returning the previously set value. Also, there are two more methods, compare_exchange_weak and compare_exchange_strong, performing atomic changes but only if the current value is equal to the actual expected value. These last two functions can be used to implement lock-free algorithms. For example:

#include 
#include 

void execute(std::promise& promise) 
{
   std::string str("processed data"); 
   promise.set_value(std::move(str));	//-- (3)   
}

void main()
{
    std::promise promise; //-- (1)   
    std::thread thread(execute, std::ref(promise)); //-- (2)
    std::future result(promise.get_future()); //-- (4)
    std::cout << "result: " << result.get() << std::endl; //-- (5)
}

In this example the header will be included first where the templete class std::atomic<> is declared. Then an atomic counter object is declared (١). Basically one can use any trivial, integral or pointer type as a parameter for the template. Note, however, the std::atomic object initialization, it must always be initialized because the default constructor does not initialize it completely. Unlike the example presented in the Mutex section in this case the counter variable can be incremented directly, without the need to use mutex (٢) because both the member functions of the std::atomic object and trivial operations such as assignments, automatic conversions, automatic increment, decrement are guaranteed to be run atomically.

It is advisable to use atomic types when one wants to use atomic operations, especially on integral types.

Conclusions

In the previous sections we have outlined how the threads in the C++11 Standard can be used, covering both the aspects of the thread management and the mechanisms used to synchronize the data and the operations using mutexes, condition variables, futures, promises, packed tasks and atomic types. As it can be seen, using threads from C++ Standard Library is not difficult and it will basically use the same mechanisms as the threads from the Boost library. However, the complexity increases with the complexity of the code design, which must behave as expected. For a better grasp of the topics above and expanding knowledge relating to new concepts available in the C++11 Standard, I highly recommend the book by Anthony Williams , C++ Concurrency in Action , and the latest edition of the classic The C++ Standard Library, by Nicolai Josuttis. You will find there not only a breakdown of the topics presented above, but also other new features specific to the C++11 Standard, including techniques for using them in order to perform the multithreading programming at an advanced level.