C++ Latches and Barriers

ISO/IEC JTC1 SC22 WG21 N3666 - 2013-04-18

Alasdair Mackintosh, alasdair@google.com, alasdair.mackintosh@gmail.com

Introduction
Solution
    Latch Operations
    Barrier Operations
    Sample Usage
Synopsis
Wording

Introduction

Certain idioms that are commonly used in concurrent programming are missing from the standard libraries. Although many of these these can be relatively straightforward to implement, we believe it is more efficient to have a standard version.

In addition, although some idioms can be provided using mutexes, higher performance can often be obtained with atomic operations and lock-free algorithms. However, these algorithms are more complex to write, and are prone to error.

Other standard concurrency idioms may have difficult corner cases, and can be hard to implement correctly. For these reasons, we believe that it is valuable to provide these in the standard library.

Solution

We propose a set of commonly-used concurrency classes, some of which may be implemented using efficient lock-free algorithms where appropriate. This paper describes the latch and barrier classes.

Latches are a thread co-ordination mechanism that allow one or more threads to block until an operation is completed. An individual latch is a single-use object; once the operation has been completed, it cannot be re-used.

Barriers are a thread co-ordination mechanism that allow multiple threads to block until an operation is completed. Unlike a latch, a barrier is re-usable; once the operation has been completed, the threads can re-use the same barrier. It is thus useful for managing repeated tasks handled by multiple threads.

A reference implementation of these two classes has been written.

Latch Operations

A latch maintains an internal counter that is initialized when the latch is created. One or more threads may block waiting until the counter is decremented to 0.
constructor( size_t );

The parameter is the initial value of the internal counter.

destructor( );

Destroys the latch. If the latch is destroyed while other threads are in wait(), or are invoking count_down(), the behaviour is undefined.

void count_down( );

Decrements the internal count by 1, and returns. If the count reaches 0, any threads blocked in wait() will be released.

Throws std::logic_error if the internal count is already 0.

void wait( );

Blocks the calling thread until the internal count is decremented to 0 by one or more other threads calling count_down(). If the count is already 0, this is a no-op.

bool try_wait( );

Returns true if the internal count has been decremented to 0 by one or more other threads calling count_down(), and false otherwise. Does not block the calling thread.

void count_down_and_wait( );

Decrements the internal count by 1. If the resulting count is not 0, blocks the calling thread until the internal count is decremented to 0 by one or more other threads calling count_down().

There are no copy or assignment operations.

Memory Ordering

All calls to count_down() synchronize with any thread returning from wait(). All calls to count_down() synchronize with any thread that gets a true value from try_wait().

Barrier Operations

A barrier maintains an internal thread counter that is initialized when the barrier is created. Threads may decrement the counter and then block waiting until the specified number of threads are blocked. All threads will then be woken up, and the barrier will reset. In addition, there is a mechanism to change the thread count value after the count reaches 0.

constructor( size_t );

The parameter is the initial value of the internal thread counter.

Throws std::invalid_argument if the specified count is 0.

constructor( size_t, std::function<void()> );

The parameters are the initial value of the internal thread counter, and a function that will be invoked when the counter reaches 0.

Throws std::invalid_argument if the specified count is 0.

constructor( size_t, std::function<size_t()> );

The parameters are the initial value of the internal thread counter, and a function that will be invoked when the counter reaches 0. The return value from the function will be used to reset the internal thread counter.

Throws std::invalid_argument if the specified count is 0.

destructor( );

Destroys the barrier. If the barrier is destroyed while other threads are in count_down_and_wait(), the behaviour is undefined.

void count_down_and_wait( );

Decrements the internal thread count by 1. If the resulting count is not 0, blocks the calling thread until the internal count is decremented to 0 by one or more other threads calling count_down_and_wait().

Before any threads are released, the completion function registered in the constructor will be invoked (if specified and non-NULL). Note that the completion function may be invoked in the context of one of the blocked threads. When the completion function returns, the internal thread count will be reset, and all blocked threads will be unblocked. If the barrier was created with a void function, then the internal thread count will be reset to the initial thread count specified in the contructor. If the barrier was created with a function returning size_t, then the thread count will be reset to the function's return value. It is illegal to return 0.

Note that it is safe for a thread to re-enter count_down_and_wait() immediately. It is not necessary to ensure that all blocked threads have exited count_down_and_wait() before one thread re-enters it.

There are no copy or assignment operations.

Note that the barrier does not have separate count_down() and wait() methods. Every thread that counts down will then block until all threads have counted down. Hence only the count_down_and_wait() method is supported.

Memory Ordering

For threads X and Y that call count_down_and_wait(), the call to count_down_and_wait() in X synchronizes with the return from count_down_and_wait() in Y.

Sample Usage

Sample use cases for the latch include: An example of the first use case would be as follows:

  void DoWork(threadpool* pool) {
    latch completion_latch(NTASKS);
    for (int i = 0; i < NTASKS; ++i) {
      pool->add_task([&] {
        // perform work
        ...
        completion_latch.count_down();
      }));
    }
    // Block until work is done
    completion_latch.wait();
  }

An example of the second use case is shown below. We need to load data and then process it using a number of threads. Loading the data is I/O bound, whereas starting threads and creating data structures is CPU bound. By running these in parallel, throughput can be increased.

  void DoWork() {
    latch start_latch(1);
    vector<thread*> workers;
    for (int i = 0; i < NTHREADS; ++i) {
      workers.push_back(new thread([&] {
        // Initialize data structures. This is CPU bound.
        ...
        start_latch.wait();
        // perform work
        ...
      }));
    }
    // Load input data. This is I/O bound.
    ...
    // Threads can now start processing
    start_latch.count_down();
    }

The barrier can be used to co-ordinate a set of threads carrying out a repeated task. The number of threads can be adjusted dynamically to respond to changing requirements.

In the example below, a number of threads are performing a multi-stage task. Some tasks may require fewer steps than others, meaning that some threads may finish before others. We reduce the number of threads waiting on the barrier when this happens.



  void DoWork() {
    Tasks& tasks;
    size_t initial_threads;
    atomic<size_t> current_threads(initial_threads)
    vector<thread*> workers;

    // Create a barrier, and set a lambda that will be invoked every time the
    // barrier counts down. If one or more active threads have completed,
    // reduce the number of threads.
    std::function rf = [&] { return current_threads;};
    barrier task_barrier(n_threads, rf);

    for (int i = 0; i < n_threads; ++i) {
      workers.push_back(new thread([&] {
        bool active = true;
        while(active) {
          Task task = tasks.get();
          // perform task
          ...
          if (finished(task)) {
            current_threads--;
            active = false;
          }
          task_barrier.count_down_and_wait();
         }
       });
    }

    // Read each stage of the task until all stages are complete.
    while (!finished()) {
      GetNextStage(tasks);
    }
  }

Synopsis

The synopsis is as follows.


class latch {
 public:
  explicit latch(size_t count);
  ~latch();

  void count_down();

  void wait();

  bool try_wait();

  void count_down_and_wait();
};

class barrier {
 public:
  explicit barrier(size_t num_threads) throw (std::invalid_argument);
  barrier(size_t num_threads, std::function<void()> completion) throw (std::invalid_argument);
  barrier(size_t num_threads, void (*completion)()) throw (std::invalid_argument);

  barrier(size_t num_threads, std::function<size_t()> completion) throw (std::invalid_argument);
  barrier(size_t num_threads, size_t (*completion)()) throw (std::invalid_argument);
  ~barrier();

  void count_down_and_wait();

};

Wording

The wording in this section is relative to N3242.

30.7 Thread coordination [thread.coordination]

This section provides mechanisms for thread coordination: latches and barriers. These mechanisms allow multiple threads to block until a given operation has completed.

30.7.1 Latches [thread.latch]

A latch is a thread co-ordination mechanism that allow one or more threads to block until an operation is completed. An individual latch is a single-use object; once the operation has been completed, it cannot be re-used.

30.7.1.1 Class latch [thread.latch]


namespace std {
  class latch {
  public:
    explicit latch(size_t count);
    ~latch();

    void count_down();

    void wait();

    bool try_wait();

    void count_down_and_wait();
  };
}

The class latch provides a single-use thread coordination mechanism. An individual latch instance maintains an internal counter that is initialized when the latch is created. One or more threads may block waiting until the counter is decremented to 0.

latch(size_t count);

Effects: Constructs an object of type latch. The parameter is the initial value of the internal counter.

Throws: Nothing.

~latch();

Effects: Destroys the latch.

Requires: There are no threads currently calling wait(), count_down() or count_down_and_wait(). If there are, the behaviour is undefined.

Throws: Nothing.

void count_down();

Effects: Decrements the internal count by 1, and returns. If the count reaches 0, any threads blocked in wait() will be released.

Synchronization: All calls to count_down() synchronize with any thread returning from wait(). All calls to count_down() synchronize with any thread that obtains a true return value from try_wait().

Throws: std::logic_error if the internal count is already 0. system_error when an exception is required.

void wait();

Effects: Blocks the calling thread until the internal count is decremented to 0 by one or more other threads calling count_down(). If the count is already 0, returns immediately.

Synchronization: See count_down().

Throws: system_error when an exception is required.

bool try_wait();

Effects: Returns true if the internal count has been decremented to 0 by one or more other threads calling count_down(), and false otherwise. Does not block the calling thread.

Synchronization: See count_down().

Throws: Nothing.

void count_down_and_wait();

Effects: Decrements the internal count by 1. If the resulting internal count is not 0, blocks the calling thread until the internal count is decremented to 0 by one or more other threads calling count_down() or count_down_and_wait.

Synchronization: All calls to count_down_and_wait() synchronize with any thread returning from wait(). All calls to count_down_and_wait() synchronize with any thread that obtains a true return value from try_wait().

Throws: std::logic_error if the internal count is already 0. system_error when an exception is required.

30.7.2 Barriers [thread.barrier]

A barrier is a thread co-ordination mechanism that allows one or more threads to block until an operation is completed. A barrier is re-usable; once the operation has been completed, it can be used again to block until a subsequent set of operations has completed.

30.7.2.1 Class barrier [thread.barrier]


class barrier {
 public:
  explicit barrier(size_t num_threads) throw (std::invalid_argument);
  barrier(size_t num_threads, std::function<void()> completion) throw (std::invalid_argument);
  barrier(size_t num_threads, void (*completion)()) throw (std::invalid_argument);

  barrier(size_t num_threads, std::function<size_t()> completion) throw (std::invalid_argument);
  barrier(size_t num_threads, size_t (*completion)()) throw (std::invalid_argument);

  ~barrier();

  void count_down_and_wait();

};

The class barrier provides a re-usable coordination mechanism. An individual barrier maintains an internal thread counter that is initialized when the barrier is created. One or more threads may decrement the counter and then block waiting until the specified number of threads are similarly blocked. All threads will then be unblocked, and the barrier will reset. While resetting the barrier, there is a mechanism to change the internal thread count value.

barrier(size_t count);

Effects: Constructs an object of type barrier. The parameter is the initial value of the internal counter.

Throws: std::invalid_argument if the specified count is 0.

barrier(size_t num_threads, std::function<void()> completion);

Effects: Constructs an object of type barrier. The parameters are the initial value of the internal counter, and a std::function completion that will be invoked when the internal count reaches 0. If the function argument is NULL, it will be ignored.

Throws: std::invalid_argument if the specified count is 0.

barrier(size_t num_threads, void (*completion)()));

Effects: Constructs an object of type barrier. The parameters are the initial value of the internal counter, and a completion function that will be invoked when the internal count reaches 0. If the function argument is NULL, it will be ignored.

Throws: std::invalid_argument if the specified count is 0.

barrier(size_t num_threads, std::function<size_t()> completion);

Effects: Constructs an object of type barrier. The parameters are the initial value of the internal counter, and a std::function completion function that will be invoked when the internal count reaches 0. If the function argument is NULL, it will be ignored.

Throws: std::invalid_argument if the specified count is 0.

barrier(size_t num_threads, size_t (*completion)()));

Effects: Constructs an object of type barrier. The parameters are the initial value of the internal counter, and a completion function that will be invoked when the internal count reaches 0. If the function argument is NULL, it will be ignored.

Throws: std::invalid_argument if the specified count is 0.

~barrier();

Effects: Destroys the barrier.

Requires: There are no threads currently calling count_down_and_wait(). If there are, the behaviour is undefined.

Throws: Nothing.

void count_down_and_wait();

Effects: Decrements the internal thread count by 1. If the resulting count is not 0, blocks the calling thread until the internal count is decremented to 0 by one or more other threads calling count_down_and_wait(). Before any threads are unblocked, the completion function parameter passed to the constructor (if specified and non-NULL) will be invoked. When the completion function returns, the internal thread count will be reset, and all blocked threads will be unblocked. If the barrier was created with a void completion function, then the internal thread count will be reset to the initial thread count specified in the contructor. If the barrier was created with a completion function returning size_t, then the thread count will be reset to the function's return value. It is illegal to return 0, and a std::logic_error will be thrown in the completion function's thread if this occurs.

Note: It is safe for a thread to re-enter count_down_and_wait() immediately. It is not necessary to ensure that all blocked threads have exited count_down_and_wait() before one thread re-enters it.

Note:The thread that the completion function is invoked in is implementation dependent. It may be invoked in one of the threads that has called count_down_and_wait().

Synchronization: All calls to count_down_and_wait() synchronize with any thread returning from count_down_and_wait(). The invocation of the completion function synchronizes with any thread that has called count_down_and_wait().

Throws: std::logic_error in the thread of the completion function if the completion function returns 0. system_error when an exception is required.