ISO/IEC JTC1 SC22 WG21 N3666 - 2013-04-18
Alasdair Mackintosh, alasdair@google.com, alasdair.mackintosh@gmail.com
Introduction
Solution
Latch Operations
Barrier Operations
Sample Usage
Synopsis
Wording
Certain idioms that are commonly used in concurrent programming are missing from the standard libraries. Although many of these these can be relatively straightforward to implement, we believe it is more efficient to have a standard version.
In addition, although some idioms can be provided using mutexes, higher performance can often be obtained with atomic operations and lock-free algorithms. However, these algorithms are more complex to write, and are prone to error.
Other standard concurrency idioms may have difficult corner cases, and can be hard to implement correctly. For these reasons, we believe that it is valuable to provide these in the standard library.
We propose a set of commonly-used concurrency classes, some of which may be implemented using efficient lock-free algorithms where appropriate. This paper describes the latch and barrier classes.
Latches are a thread co-ordination mechanism that allow one or more threads to block until an operation is completed. An individual latch is a single-use object; once the operation has been completed, it cannot be re-used.
Barriers are a thread co-ordination mechanism that allow multiple threads to block until an operation is completed. Unlike a latch, a barrier is re-usable; once the operation has been completed, the threads can re-use the same barrier. It is thus useful for managing repeated tasks handled by multiple threads.
A reference implementation of these two classes has been written.
constructor( size_t );
The parameter is the initial value of the internal counter.
destructor( );
Destroys the latch. If the latch is destroyed while other threads are in
wait()
, or are invoking count_down()
, the behaviour
is undefined.
void count_down( );
Decrements the internal count by 1, and returns. If the count reaches 0, any
threads blocked in wait()
will be released.
Throws std::logic_error
if the internal count is already 0.
void wait( );
Blocks the calling thread until the internal count is decremented to 0 by one
or more other threads calling count_down()
. If the count is
already 0, this is a no-op.
bool try_wait( );
Returns true if the internal count has been decremented to 0 by one
or more other threads calling count_down()
, and false otherwise.
Does not block the calling thread.
void count_down_and_wait( );
Decrements the internal count by 1. If the resulting count is not 0, blocks the
calling thread until the internal count is decremented to 0 by one or more
other threads calling count_down()
.
There are no copy or assignment operations.
count_down()
synchronize with any thread
returning from wait()
. All calls to count_down()
synchronize with any thread that gets a true value from try_wait()
.
A barrier maintains an internal thread counter that is initialized when the barrier is created. Threads may decrement the counter and then block waiting until the specified number of threads are blocked. All threads will then be woken up, and the barrier will reset. In addition, there is a mechanism to change the thread count value after the count reaches 0.
constructor( size_t );
The parameter is the initial value of the internal thread counter.
Throws std::invalid_argument
if the specified count is 0.
constructor( size_t, std::function<void()> );
The parameters are the initial value of the internal thread counter, and a function that will be invoked when the counter reaches 0.
Throws std::invalid_argument
if the specified count is 0.
constructor( size_t, std::function<size_t()> );
The parameters are the initial value of the internal thread counter, and a function that will be invoked when the counter reaches 0. The return value from the function will be used to reset the internal thread counter.
Throws std::invalid_argument
if the specified count is 0.
destructor( );
Destroys the barrier. If the barrier is destroyed while other threads are in
count_down_and_wait()
, the behaviour is undefined.
void count_down_and_wait( );
Decrements the internal thread count by 1. If the resulting count is not 0,
blocks the calling thread until the internal count is decremented to 0 by one
or more other threads calling count_down_and_wait()
.
Before any threads are released, the completion function registered in the constructor will be invoked (if specified and non-NULL). Note that the completion function may be invoked in the context of one of the blocked threads. When the completion function returns, the internal thread count will be reset, and all blocked threads will be unblocked. If the barrier was created with a void function, then the internal thread count will be reset to the initial thread count specified in the contructor. If the barrier was created with a function returning size_t, then the thread count will be reset to the function's return value. It is illegal to return 0.
Note that it is safe for a thread to re-enter count_down_and_wait()
immediately. It is not necessary to ensure that all blocked threads have exited
count_down_and_wait()
before one thread re-enters it.
There are no copy or assignment operations.
Note that the barrier does not have separate count_down()
and
wait()
methods. Every thread that counts down will then block until
all threads have counted down. Hence only the
count_down_and_wait()
method is supported.
count_down_and_wait()
, the
call to count_down_and_wait()
in X synchronizes with the return from
count_down_and_wait()
in Y.
void DoWork(threadpool* pool) {
latch completion_latch(NTASKS);
for (int i = 0; i < NTASKS; ++i) {
pool->add_task([&] {
// perform work
...
completion_latch.count_down();
}));
}
// Block until work is done
completion_latch.wait();
}
An example of the second use case is shown below. We need to load data and then
process it using a number of threads. Loading the data is I/O bound, whereas
starting threads and creating data structures is CPU bound. By running these in
parallel, throughput can be increased.
void DoWork() {
latch start_latch(1);
vector<thread*> workers;
for (int i = 0; i < NTHREADS; ++i) {
workers.push_back(new thread([&] {
// Initialize data structures. This is CPU bound.
...
start_latch.wait();
// perform work
...
}));
}
// Load input data. This is I/O bound.
...
// Threads can now start processing
start_latch.count_down();
}
The barrier can be used to co-ordinate a set of threads carrying out a repeated task. The number of threads can be adjusted dynamically to respond to changing requirements.
In the example below, a number of threads are performing a multi-stage task. Some tasks may require fewer steps than others, meaning that some threads may finish before others. We reduce the number of threads waiting on the barrier when this happens.
void DoWork() {
Tasks& tasks;
size_t initial_threads;
atomic<size_t> current_threads(initial_threads)
vector<thread*> workers;
// Create a barrier, and set a lambda that will be invoked every time the
// barrier counts down. If one or more active threads have completed,
// reduce the number of threads.
std::function rf = [&] { return current_threads;};
barrier task_barrier(n_threads, rf);
for (int i = 0; i < n_threads; ++i) {
workers.push_back(new thread([&] {
bool active = true;
while(active) {
Task task = tasks.get();
// perform task
...
if (finished(task)) {
current_threads--;
active = false;
}
task_barrier.count_down_and_wait();
}
});
}
// Read each stage of the task until all stages are complete.
while (!finished()) {
GetNextStage(tasks);
}
}
The synopsis is as follows.
class latch {
public:
explicit latch(size_t count);
~latch();
void count_down();
void wait();
bool try_wait();
void count_down_and_wait();
};
class barrier {
public:
explicit barrier(size_t num_threads) throw (std::invalid_argument);
barrier(size_t num_threads, std::function<void()> completion) throw (std::invalid_argument);
barrier(size_t num_threads, void (*completion)()) throw (std::invalid_argument);
barrier(size_t num_threads, std::function<size_t()> completion) throw (std::invalid_argument);
barrier(size_t num_threads, size_t (*completion)()) throw (std::invalid_argument);
~barrier();
void count_down_and_wait();
};
This section provides mechanisms for thread coordination: latches and barriers. These mechanisms allow multiple threads to block until a given operation has completed.
namespace std {
class latch {
public:
explicit latch(size_t count);
~latch();
void count_down();
void wait();
bool try_wait();
void count_down_and_wait();
};
}
The class latch
provides a single-use thread coordination mechanism.
An individual latch instance maintains an internal counter that is initialized
when the latch is created.
One or more threads may block waiting until the counter is decremented to 0.
latch(size_t count);
Effects: Constructs an object of type latch. The parameter is the initial value of the internal counter.
Throws: Nothing.
~latch();
Effects: Destroys the latch.
Requires: There are no threads currently calling
wait()
,count_down()
orcount_down_and_wait()
. If there are, the behaviour is undefined.Throws: Nothing.
void count_down();
Effects: Decrements the internal count by 1, and returns. If the count reaches 0, any threads blocked in
wait()
will be released.Synchronization: All calls to
count_down()
synchronize with any thread returning fromwait()
. All calls tocount_down()
synchronize with any thread that obtains a true return value fromtry_wait()
.Throws:
std::logic_error
if the internal count is already 0.system_error
when an exception is required.
void wait();
Effects: Blocks the calling thread until the internal count is decremented to 0 by one or more other threads calling
count_down()
. If the count is already 0, returns immediately.Synchronization: See
count_down()
.Throws:
system_error
when an exception is required.
bool try_wait();
Effects: Returns true if the internal count has been decremented to 0 by one or more other threads calling
count_down()
, and false otherwise. Does not block the calling thread.Synchronization: See
count_down()
.Throws: Nothing.
void count_down_and_wait();
Effects: Decrements the internal count by 1. If the resulting internal count is not 0, blocks the calling thread until the internal count is decremented to 0 by one or more other threads calling
count_down()
orcount_down_and_wait
.Synchronization: All calls to
count_down_and_wait()
synchronize with any thread returning fromwait()
. All calls tocount_down_and_wait()
synchronize with any thread that obtains a true return value fromtry_wait()
.Throws:
std::logic_error
if the internal count is already 0.system_error
when an exception is required.
class barrier {
public:
explicit barrier(size_t num_threads) throw (std::invalid_argument);
barrier(size_t num_threads, std::function<void()> completion) throw (std::invalid_argument);
barrier(size_t num_threads, void (*completion)()) throw (std::invalid_argument);
barrier(size_t num_threads, std::function<size_t()> completion) throw (std::invalid_argument);
barrier(size_t num_threads, size_t (*completion)()) throw (std::invalid_argument);
~barrier();
void count_down_and_wait();
};
The class barrier
provides a re-usable coordination mechanism.
An individual barrier maintains an internal thread counter that is
initialized when the barrier is created.
One or more threads may decrement the counter and then block waiting
until the specified number of threads are similarly blocked.
All threads will then be unblocked, and the barrier will reset.
While resetting the barrier, there is a mechanism to
change the internal thread count value.
barrier(size_t count);
Effects: Constructs an object of type barrier. The parameter is the initial value of the internal counter.
Throws:
std::invalid_argument
if the specified count is 0.
barrier(size_t num_threads, std::function<void()> completion);
Effects: Constructs an object of type barrier. The parameters are the initial value of the internal counter, and a
std::function
completion that will be invoked when the internal count reaches 0. If the function argument is NULL, it will be ignored.Throws:
std::invalid_argument
if the specified count is 0.
barrier(size_t num_threads, void (*completion)()));
Effects: Constructs an object of type barrier. The parameters are the initial value of the internal counter, and a completion function that will be invoked when the internal count reaches 0. If the function argument is NULL, it will be ignored.
Throws:
std::invalid_argument
if the specified count is 0.
barrier(size_t num_threads, std::function<size_t()> completion);
Effects: Constructs an object of type barrier. The parameters are the initial value of the internal counter, and a
std::function
completion function that will be invoked when the internal count reaches 0. If the function argument is NULL, it will be ignored.Throws:
std::invalid_argument
if the specified count is 0.
barrier(size_t num_threads, size_t (*completion)()));
Effects: Constructs an object of type barrier. The parameters are the initial value of the internal counter, and a completion function that will be invoked when the internal count reaches 0. If the function argument is NULL, it will be ignored.
Throws:
std::invalid_argument
if the specified count is 0.
~barrier();
Effects: Destroys the barrier.
Requires: There are no threads currently calling
count_down_and_wait()
. If there are, the behaviour is undefined.Throws: Nothing.
void count_down_and_wait();
Effects: Decrements the internal thread count by 1. If the resulting count is not 0, blocks the calling thread until the internal count is decremented to 0 by one or more other threads calling
count_down_and_wait()
. Before any threads are unblocked, the completion function parameter passed to the constructor (if specified and non-NULL) will be invoked. When the completion function returns, the internal thread count will be reset, and all blocked threads will be unblocked. If the barrier was created with a void completion function, then the internal thread count will be reset to the initial thread count specified in the contructor. If the barrier was created with a completion function returning size_t, then the thread count will be reset to the function's return value. It is illegal to return 0, and astd::logic_error
will be thrown in the completion function's thread if this occurs.Note: It is safe for a thread to re-enter
count_down_and_wait()
immediately. It is not necessary to ensure that all blocked threads have exitedcount_down_and_wait()
before one thread re-enters it.Note:The thread that the completion function is invoked in is implementation dependent. It may be invoked in one of the threads that has called
count_down_and_wait()
.Synchronization: All calls to
count_down_and_wait()
synchronize with any thread returning fromcount_down_and_wait()
. The invocation of the completion function synchronizes with any thread that has calledcount_down_and_wait()
.Throws:
std::logic_error
in the thread of the completion function if the completion function returns 0.system_error
when an exception is required.