Synchronization: Condition Variables

2021/08/31

C++ provides a number of powerful utilities for thread synchronization. In this post, I will discuss the condition variables primitive.

What Problem Are We Trying To Solve?

Synchronization implies that there are multiple things occurring at the same time, and we would like to make sure that one thing happens before another. Imagine we have a car wash. When a car arrives, the driver pays (he notifies us). We check that we have been paid (the condition has been satisfied), and then the wash cycle begins. When there’s no cars, we can simply wait — perhaps by taking a nap.

Condition variables work somewhat like this. A condition variable waits until some condition is satisfied. We can also tell the condition variable that the condition has been satisfied (which wakes it up). These are efficient for synchronization since the CPU can sleep until the condition has been met (as opposed to a busy wait).

As an aside, synchronization problems are often rife with analogies (like our car wash), mostly because synchronization is both frequent within our lives and difficult to talk about in purely technical terms. I will make liberal use of analogies to help aide with explanations.

It is critically important to know the requirements of the synchronization problem (that is, we need to get our analogy right). Condition variables are powerful enough to solve many synchronization problems where we wait on some condition. However, futures are simpler primitives that are somewhat easier to reason about. Getting the right analogy allows us to properly align a given problem with the correct technical solution.

Condition Variables

I have often seen condition variables as the “one size fits all” solution to synchronization problems. They are used whenever you want to wait on some condition. That condition could happen many times (say, new data becomes available for processing regularly). There can also be multiple threads waiting on that data (i.e. a condition variable supports multiple concurrent handlers).

Consider we have a sensor that periodically reads a value. As we read the value, we would like to do some processing (say, we would like to post it to the screen). Let’s call the thread that produces data the producer and the thread that consumes data the consumer. In a naïve approach, a shared variable is used to store the most recent sensor reading by the producer. The consumer will read from this variable. The producer writes data as quickly as it can, while the consumer reads data as fast as it can.

// Think of this as a data generating activity, like reading from a sensor or
// making a query elsewhere. 
int getData()
{
    static int i = 0;
    return i++;
}


void noSynchronization()
{
    constexpr int COUNT = 10;
    int shared = 0;

    std::thread producer([&shared](){
        for (size_t i = 0; i < COUNT; ++i)
        {
            shared = getData();
        }
    });

    std::thread consumer([&shared](){
    for (size_t i = 0; i < COUNT; ++i)
        {
            std::cout << shared << '\n';
        }
    });

    producer.join();
    consumer.join();
}

There are some problems with this approach:

  1. We do not guarantee that each piece of data from the producer is actually processed by the consumer. For some systems, that may not be a problem (a temperature reading may simply need to have the most recent temperature value, without any processing). However, in another system, this is a fatal flaw (consider where the shared data may be something like an update package, where missing any data leaves the system unable to operate).
  2. There is no synchronization, so partial reads and writes of shared are possible. In practice, this may not be a problem for data types that are only a few bytes (that is, they fit within one read and write operation). However, for anything larger than that, this can become problematic as reads and writes are not atomic.

Let’s solve this problem using a condition variable. In this case, the condition is that new data is available. In synchronization parlance, this means that the consumer will wait on the condition variable, and the producer will post to the condition variable. (In general, it is best not to assume that our types will be treated atomic way unless we have declared them as such.)

Condition variables come in two parts:

  1. A std::condition_variable, which signals that some condition has been met.
  2. A std::mutex, which protects some shared data.

On the producer side, we will:

  1. Lock the mutex that protects the shared data.
  2. Access the shared data.
  3. Unlock the mutex.
  4. Post the condition variable.

On the consumer side, we will:

  1. Lock the mutex that protects the shared data.
  2. Wait on the condition variable and pass the mutex to it. While the condition variable is waiting, it releases the mutex. Once the wait is complete, we hold the mutex!
// Think of this as a data generating activity, like reading from a sensor or
// making a query elsewhere. 
int getData()
{
    static int i = 0;
    return i++;
}


void synchronization()
{
    constexpr int COUNT = 10;
    std::queue<int> shared;
    std::mutex mtx;
    std::condition_variable cv;

    std::thread producer([&shared, &mtx, &cv](){
        for (size_t i = 0; i < COUNT; ++i)
        {
            {
                std::lock_guard<std::mutex> lk(mtx);
                shared.push(getData());
            } // Out-of-scope: Release the mutex
            cv.notify_one();
        }
    });

    std::thread consumer([&shared, &mtx, &cv](){
    for (size_t i = 0; i < COUNT; ++i)
        {
            std::unique_lock<std::mutex> lk(mtx);
            // Note: The lock is released during a wait()
            cv.wait(lk,
                [&shared]{ return !shared.empty(); });
            // Hold the mutex once the wait is complete
            std::cout << shared.front() << '\n';
            shared.pop();
        }
    });

    producer.join();
    consumer.join();
}

The Spurious Wake

A condition variable can be woken at any time. This is counter-intuitive if you haven’t seen it before, and (poor) C++ concurrency code will often disregard spurious wakes.

There is a simple mechanism for handling this. The condition that is being waited on can be checked via a predicate. In this case, recall that our condition is that new data is available. To do this, we change shared from a simple int to a std::queue, and check whether the queue is empty using a lambda.

The use of a queue allows us to have an empty state (i.e. one where there is no data available). This is only one of many ways to indicate that there is nothing present, but unlike other methods, it also allows multiple data points to be queued up, if we ever find ourselves in such a position.

When a Mutex is Held

A mutex is held by std::condition_variable::wait only when:

  1. The condition predicate is being checked.
  2. The wait has been satisfied.

Otherwise, the condition variable releases the mutex during the wait.

Since a condition variable releases the mutex, the mutex must be held by a std::unique_lock rather than a std::lock_guard. (As a simple rule-of-thumb, use std::lock_guard when the mutex is cleaned up via RAII and does not require manual locking or unlocking. If manual control is required, use std::unique_lock.)

Multiple Consumers

The description above has only a single consumer, and only one is ever notified via the std::condition_variable::notify_one.

Multiple consumers can exist to handle the data. If this is the case, then any one of the threads whose condition variable is in the waiting state can be called. If any condition variable’s condition predicate is satisfied, then it will unblock (and no other condition variables will be called).

Alternatively, multiple consumers may each do something different, and so rather than notifying one, we would like to notify them all. For example, we may have a synchronization point where several actions (each on their own thread) are triggered once a certain condition is reached. This is achieved via std::condition_variable::notify_all.

Waiting Forever

Rarely do we want to wait forever. The wait variant std::condition_variable::wait_for allows us to specify a timeout, which can be useful to produce richer error-handling.

Alternatives

A condition variable assumes that there is some protected and shared data, and that the condition will be met multiple times.

>> Home