P3481R1
std::execution::bulk() issues

Published Proposal,

This version:
http://wg21.link/P3481R1
Authors:
(Garmin)
(Intel)
Audience:
SG1, LEWG
Project:
ISO/IEC 14882 Programming Languages — C++, ISO/IEC JTC1/SC22/WG21

Abstract

This paper explores the issues with std::execution::bulk algorithm and proposes a way to address them, making the API better for the users.

1. Changes history

1.1. R1

2. Motivation

When [P2300R10] was merged into the working draft, a subsequent review of the github issue tracker discovered several outstanding issues relating to the use of the bulk algorithm that was in [P2300R10] which were not addressed during the LEWG review. These issues mostly account for better expressing what bulk can and cannot do.

For most algorithms defined in [P2300R10], the default implementation typically describes the desired behavior. However, this is not the case for bulk.

The expectation for bulk is that it will be customized for most use cases. The default implementation is a common-denominator solution that is not necessarily optimal for the majority of scenarios.

The description in [exec.bulk] specifies the default implementation but does not outline any requirements for customizations. This creates uncertainty for users regarding what they can expect when invoking bulk.

This paper addresses this issue by specifying what is permissible in a customization of bulk.

A second issue this paper seeks to resolve is the lack of an execution policy for the provided functor. For example, imagine that we have scheduler that supports par_unseq execution policy only for parallelism (certain GPU schedulers) and it wants to customize bulk. If the functor is unsuitable for the par_unseq execution policy, there is currently no mechanism for users to express this with the current API shape.

A third issue this paper addresses is the absence of chunking in the default implementation of bulk. Invoking the functor for every element in a range can lead to performance challenges in certain use cases.

2.1. Allowed behavior for bulk customizations

From the specification of bulk in [exec.bulk], if we have customized bulk, the following are unclear:

2.2. Execution policy for the given functor

Similar to for_each ([alg.foreach]) users might expect the bulk algorithm to take an execution policy as argument. The execution policy would define the execution capabilities of the given function.

There are two main reasons for wanting an execution policy to accompany the function passed to bulk:

  1. To better tailor the algorithm to the available hardware.

  2. To improve usage in generic code.

Currently, we have hardware that supports both par and par_unseq execution policy semantics, as well as hardware that only supports par_unseq. Standardizing the API without execution policies implies that the language chooses a default execution policy for bulk. This decision could favor certain hardware vendors over others.

From a generic programming perspective, consider a scenario where a user implements a function like the following:

void process(std::vector<int>& data, auto std::invocable<int> f) {
  // call `f` for each element in `data`; try using `bulk`.
  // can we call `f` concurrently?
}

In the body of the generic function process, we do not know the constraints imposed on f. Can we assume that f can be invoked with the par_unseq execution policy? Can it be invoked with the par execution policy? Or should we default to the seq execution policy?

Since there is no way to infer the execution policy that should apply to f, the implementation of process must be conservative and default to the seq execution policy. However, in most cases, the par or par_unseq execution policy would likely be more appropriate.

2.3. Chunking

The current specification of bulk ([exec.bulk]) does not support chunking. This limitation presents a performance issue for certain use cases.

If two iterations can potentially be executed together more efficiently than in isolation, chunking would provide a performance benefit.

Let us take an example:

std::atomic<std::uint32_t> sum{0};
std::vector<std::uint32_t> data;
ex::sender auto s = ex::bulk(ex::just(), std::execution::par, 100'000,
  [&sum,&data](std::uint32_t idx) {
    sum.fetch_add(data[idx]);
  });

In this example, we perform 100,000 atomic operations. A more efficient implementation would allow a single atomic operation for each chunk of work, enabling each chunk to perform local summation. This approach might look like:

std::atomic<std::uint32_t> sum{0};
ex::sender auto s = ex::bulk_chunked(ex::just(), std::execution::par, 100'000,
  [&sum,&data](std::uint32_t begin, std::uint32_t end) {
    std::uint32_t partial_sum = 0;
    while (begin != end) {
      partial_sum += data[idx];
    }
    sum.fetch_add(partial_sum);
  });

Other similar examples exist where grouping operations together can improve performance. This is particularly important when the functor passed to bulk cannot be inlined or when the functor is expensive to invoke.

3. API

Define bulk to match the following API:

// NEW: algorithm to be used as a default basis operation
template<execution::sender Predecessor,
         typename ExecutionPolicy,
         std::integral Size,
         std::invocable<Size, Size, values-sent-by(Predecessor)...> Func
>
execution::sender auto bulk_chunked(Predecessor pred,
                                    ExecutionPolicy&& pol,
                                    Size size,
                                    Func f);

// NEW: algorithm to ensure one execution agent per iteration
template<execution::sender Predecessor,
         typename ExecutionPolicy,
         std::integral Size,
         std::invocable<Size, values-sent-by(Predecessor)...> Func
>
execution::sender auto bulk_unchunked(Predecessor pred,
                                    ExecutionPolicy&& pol,
                                    Size size,
                                    Func f);

template<execution::sender Predecessor,
         typename ExecutionPolicy,
         std::integral Size,
         std::invocable<Size, values-sent-by(Predecessor)...> Func
>
execution::sender auto bulk(Predecessor pred,
                            ExecutionPolicy&& pol, // NEW
                            Size size,
                            Func f) {
    // Default implementation
    return bulk_chunked(
        std::move(pred),
        std::forward<ExecutionPolicy>(pol),
        size,
        [func=std::move(func)]<typename... Vs>(
            Size begin, Size end, Vs&&... vs)
            noexcept(std::is_nothrow_invocable_v<Func, Size, Vs&&...>) {
            while (begin != end) {
                f(begin++, std::forward<Vs>(vs)...);
            }
        });
}

The bulk_chunked algorithm is a customization point returning a sender that describes a work with following properties:

The bulk_unchunked algorithm is a customization point returning a sender that describes a work with following properties:

The bulk algorithm is a customization point. The default implementation calls bulk_chunked as shown above.

4. Design discussions

4.1. Use chunked version as a basis operation

To address the performance problem described in the motivation, we propose to add a chunked version for bulk. This allows implementations to process the iteration space in chunks, and take advantage of the locality of the chunk processing. This is useful when publishing the effects of the functor may be expensive (the example from the motivation) and when the given functor cannot be inlined.

The implementation of bulk_chunked can use dynamically sized chunks that adapt to the workload at hand. For example, computing the results of a Mandelbrot fractal on a line is an unbalanced process. Some values can be computed very fast, while others take longer to compute.

Passing a range as parameters to the functor passed to bulk_chunked is similar to the way Intel TBB’s parallel_for functions (see [parallel_for]).

An implementation of bulk can be easily built on top of bulk_chunked without losing performance (as shown in the Proposal section).

4.2. Also define an unchunked version

For GPUs is important to have a version of bulk that ensures one execution agent per iteration. Defining bulk in terms of bulk_chunked would not be efficient for GPUs.

Moreover, for composability reasons, we need a way to specify which version of bulk will not do any chunking. For example, an algorithm tailored for GPUs that depends on bulk would not want to have chunking, and, without controlling the specialization for bulk, it must not accidentally call a chunked version of bulk.

For this reasons, we need to have a version of bulk that doesn’t do any chunking.

4.3. Using execution policies

As discussed in the § 2 Motivation section, not having the possibility to specify execution policies for the bulk algorithm is a downside. The only reasonable default execution policy choice is seq because defaulting to anything else might lead to deadlocks. On contrary, it does not make sense because the use cases, bulk (and bulk_chunked/bulk_unchunked) is aimed for would expect parallelism, that is using unseq, par or par_unseq policies. Thus, bulk has to have an execution policy parameter.

One criticism of this solution is that each invocation of bulk needs to contain an extra parameter that is typically verbose to type.

The idea considered by the authors to solve it is to provide versions of the algorithms that have the execution policies already baked in. Something like:

auto bulk_seq(auto prev, auto size, auto f);
auto bulk_unseq(auto prev, auto size, auto f);
auto bulk_par(auto prev, auto size, auto f);
auto bulk_par_unseq(auto prev, auto size, auto f);

auto bulk_chunked_seq(auto prev, auto size, auto f);
auto bulk_chunked_unseq(auto prev, auto size, auto f);
auto bulk_chunked_par(auto prev, auto size, auto f);
auto bulk_chunked_par_unseq(auto prev, auto size, auto f);

auto bulk_unchunked_seq(auto prev, auto size, auto f);
auto bulk_unchunked_unseq(auto prev, auto size, auto f);
auto bulk_unchunked_par(auto prev, auto size, auto f);
auto bulk_unchunked_par_unseq(auto prev, auto size, auto f);

We dropped this idea, as this isn’t scalable.

On the other hand, it’s quite easy to write a thin wrapper on top of bulk / bulk_chunked on the user side that calls the algorithm with the execution policy of choice. Something like:

auto user_bulk_par(auto prev, auto size, auto f) {
  return std::execution::bulk(std::execution::par, prev, size, f);
}

4.4. Conflicting execution policies

For [P2500R2] design we previously agreed that there might be an execution policy attached to a scheduler. The question that comes to mind is what should we do if we have execution policy passed via bulk parameters that conflicts with execution policy attached to a scheduler?

This is another area to explore. It’s quite obvious that execution policy passed to bulk describes possible parallelization ways of bulk callable object, while it’s not 100% obvious what execution policy attached to a scheduler means outside of [P2500R2] proposal.

We might choose different strategies (not all of them are mutually exclusive):

Anyway, exploration of this problem is out of scope of this paper. For the purpose of this paper, we need to ensure that the way the given function is called is compatible with the execution policy passed to bulk.

4.5. Expectations on customizations

The current specification of bulk in [exec.bulk] doesn’t define any constraints/expectations for the customization of the algorithm. The default implementation (a serial version of bulk) is expected to be very different that its customizations.

Having customizations in mind, we need to define the minimal expectations for calling bulk. We propose the following:

We want to require that the given functor be copy-constructible so that bulk requires the same type of function as a for_each with an execution policy. Adding an execution policy to bulk/bulk_chunked/bulk_unchunked would also align them with for_each.

If f is allowed to be invoked concurrently, then implementations may want to copy the arguments passed to it.

Another important point for a bulk operation is how it should react to cancellation requests. We want to specify that bulk should be able to react to cancellation requests, but not make this mandatory. It shall be also possible for vendors to completely ignore cancellation requests (if, for example, supporting cancellation would actually slow down the main uses-cases that the vendor is optimizing for). Also, the way cancellation is implemented should be left unspecified.

Checking the cancellation token every iteration may be expensive for certain cases. A cancellation check requires an acquire memory barrier (which may be too much for really small functors), and might prevent vectorization. Thus, implementations may want to check the cancellation every N iterations; N can be a statically known number, or can be dynamically determined.

As bulk, bulk_chunked, and bulk_unchunked are customization points, we need to specify these constraints to all three of them.

4.6. Exception handling

While there are multiple solutions to specify which errors can be thrown by bulk, the most sensible ones seem to be:

  1. pick an arbitrary exception thrown by one of the invocations of f (maybe using a atomic operations to select the first thrown exception),

  2. reduce the exceptions to another exception that can carry one or more of the thrown exceptions using a user-defined reduction operation (similar to using exception_list as discussed by [P0333R0]),

  3. allow implementations to produce a new exception type (e.g., to represent failures outside of f, or to represent failure to transmit the original exception to the receiver)

One should note that option 2 can be seen as a particular case of option 3.

Also, option 2 seems to be more complex than option 1, without providing any palpable benefits to the users.

The third option is very useful when implementations may fail outside of the calls to the given functor. Also, there may be cases in which exceptions cannot be transported from the place they were thrown to the place they need to be reported.

Based on the experience with the existing parallel frameworks we incline to recommend the option one because

On top of that, for special cases we also want to allow option 3.

5. Specification

5.1. In [execution.syn]

After struct bulk_t { unspecified };, add:

struct bulk_chunked_t { unspecified };
struct bulk_unchunked_t { unspecified };

After inline constexpr bulk_t bulk{};, add:

inline constexpr bulk_chunked_t bulk_chunked{};
inline constexpr bulk_unchunked_t bulk_unchunked{};

5.2. In [exec.bulk]

Rename the title of the section from “execution::bulk” to “execution::bulk, execution::bulk_chunked, and execution::bulk_unchunked”.

Apply the following changes (no track changes for paragraph numbering):

  1. bulk , bulk_chunked, and bulk_unchunked run s a task repeatedly for every index in an index space.

  2. The name names bulk , bulk_chunked, and bulk_unchunked denotes a denote pipeable sender adaptor object objects . Let bulk-algo be either bulk, bulk_chunked or bulk_unchunked. For subexpressions sndr, policy, shape, and f, let Policy be remove_cvref_t<decltype(policy)> and Shape be decltype(auto(shape)). If decltype((sndr)) does not satisfy sender, or if Shape does not satisfy integral, or if decltype((f)) does not satisfy movable-value, bulk(sndr, shape, f) is ill-formed. The expression bulk-algo(sndr, policy, shape, f) is ill-formed if any of the following is true:

    • decltype((sndr)) does not satisfy sender,

    • is_execution_policy_v<remove_cvref_t<Policy>> is false,
    • Shape does not satisfy integral,

    • decltype((f)) does not model the copy_constructible concept.

  3. Otherwise, the expression bulk(sndr, shape, f) bulk-algo(sndr, policy, shape, f) is expression-equivalent to:

    transform_sender(
      get-domain-early(sndr),
      make-sender(bulk, product-type{shape, f}, sndr))
    
    transform_sender(
      get-domain-early(sndr),
      make-sender(bulk-algo, product-type{policy, shape, f}, sndr))
    

    except that sndr is evaluated only once.

  4. The exposition-only class template impls-for ([exec.snd.general]) is specialized for bulk_t , bulk_chunked_t or bulk_unchunked_t as follows:

    namespace std::execution {
      template<>
      struct impls-for<bulk_tbulk-algo> : default-impls {
        static constexpr auto complete = see below;
      };
    }
    
    1. The member impls-for<bulk_tbulk_unchunked_t>::complete is initialized with a callable object equivalent to the following lambda:

      []<class Index, class State, class Rcvr, class Tag, class... Args>
        (Index, State& state, Rcvr& rcvr, Tag, Args&&... args) noexcept
        -> void requires see below {
          if constexpr (same_as<Tag, set_value_t>) {
            auto& [shape, f] = state;
            auto& [policy, shape, f] = state;
            constexpr bool nothrow = noexcept(f(auto(shape), args...));
            TRY-EVAL(rcvr, [&]() noexcept(nothrow) {
              for (decltype(auto(shape)) i = 0; i < shape; ++i) {
                f(auto(i), args...);
              }
              Tag()(std::move(rcvr), std::forward<Args>(args)...);
            }());
          } else {
            Tag()(std::move(rcvr), std::forward<Args>(args)...);
          }
        }
      
      1. The expression in the requires-clause of the lambda above is true if and only if Tag denotes a type other than set_value_t or if the expression f(auto(shape), args...) is well-formed.

    1. The member impls-for<bulk_chunked_t>::complete is initialized with a callable object equivalent to the following lambda:

      []<class Index, class State, class Rcvr, class Tag, class... Args>
        (Index, State& state, Rcvr& rcvr, Tag, Args&&... args) noexcept
        -> void requires see below {
          if constexpr (same_as<Tag, set_value_t>) {
            auto& [policy, shape, f] = state;
            constexpr bool nothrow = noexcept(f(auto(shape), auto(shape), args...));
            TRY-EVAL(rcvr, [&]() noexcept(nothrow) {
              f(0, auto(shape), args...);
              Tag()(std::move(rcvr), std::forward<Args>(args)...);
            }());
          } else {
            Tag()(std::move(rcvr), std::forward<Args>(args)...);
          }
        }
      
      1. The expression in the requires-clause of the lambda above is true if and only if Tag denotes a type other than set_value_t or if the expression f(auto(shape), auto(shape), args...) is well-formed.

    2. The member impls-for<bulk_t>::complete is initialized with a callable object equivalent to the following lambda:

      []<class Index, class State, class Rcvr, class Tag, class... Args>
        (Index, State& state, Rcvr& rcvr, Tag, Args&&... args) noexcept
        -> void requires see below {
          auto& [policy, shape, f] = state;
          constexpr bool nothrow = noexcept(f(auto(shape), args...));
          auto new_f = [func=std::move(f)](Size begin, Size end, Vs&... vs)
              noexcept(nothrow) {
            while (begin != end) f(begin++, std::forward(vs)...);
          }
          impls-for<bulk_chunked_t>::complete(Index(),
            product-type{policy, shape, std::move(new_f)},
            rcvr, Tag(), std::forward<Args>(args)...);
        }
      
      1. The expression in the requires-clause of the lambda above is true if and only if Tag denotes a type other than set_value_t or if the expression f(auto(shape), args...) is well-formed.

  5. If bulk-algo is bulk_chunked, bulk_unchunked, or bulk, let Let the subexpression out_sndr denote the result of the invocation bulk bulk-algo (sndr, policy, shape, f) or an object equal to such, and let the subexpression rcvr denote a receiver such that the expression connect(out_sndr, rcvr) is well-formed. The expression connect(out_sndr, rcvr) has undefined behavior unless it creates an asynchronous operation ([async.ops]) that, when started:
    • on a value completion operation, invokes f(i, args...) for every i of type Shape from 0 to shape, where args is a pack of lvalue subexpressions referring to the value completion result datums of the input sender, and
    • propagates all completion operations sent by sndr.
    • if sndr has a scuccessful completion, where args is a pack of lvalue subexpressions referring to the value completion result datums of sndr, then:

      • if out_sndr also completes successfully, then:

        • if bulk-algo is bulk or bulk_unchunked, invokes f(i, args...) for every i of type Shape from 0 to shape;

        • if bulk-algo is bulk_chunked, invokes f(b, e, args...) zero or multiple times with pairs of b and e of type Shape in range [0, shape], such as for every i of type Shape from 0 to shape, there is an invocation with a pair b and e, such as b <= i < e;

      • if out_sndr completes with set_error(rcvr, e), the asynchronous operation may invoke a subset of the invocations of f as described above before the completion signal, and e is either:

        • an exception thrown by an invocation of f, or

        • an exception derived from runtime_error;

      • if out_sndr completes with set_stopped(rcvr), the asynchronous operation may invoke a subset of the invocations of f as described above before the completion signal;

    • if sndr does not complete with set_value(...), the completion signal is forwarded to recv

    • the function f is invoked in a way that is compatible with the execution policy policy.

  1. If bulk-algo is bulk_chunked, bulk_unchunked, or bulk, then the asyncrhonous operation corresponding to bulk-algo(sndr, policy, shape, f) is allowed (but not required) to:

    • complete with set_stopped() if cancellation is requested (through the connected receiver);

    • ignore cancellation requests;

    • make decaying copies of the values produced by the predecessor sender, if the values are copyable.

6. Polls

6.1. SG1, Wrocław, Poland, 2024

[P3481R0] was presented at the SG1 meeting in November 2024 in Wrocław, Poland. SG1 provided the following feedback in the form of polls:

  1. If we have chunking, we need to expose a control on the chunk size.

    | SF | F | N | A | SA |
    | 1  | 2 | 3 | 1 | 1  |
    

    No consensus

  2. We need a version of the bulk API that presents the chunk to the callable in order to implement the parallel algorithms

    | SF | F | N | A | SA |
    | 4  | 2 | 1 | 0 | 1  |
    

    Consensus in favor

  3. We need a version of the bulk API that creates an execution agent per iteration.

    | SF | F | N | A | SA |
    | 2  | 3 | 3 | 0 | 0  |
    

    Unanimous consent

  4. We believe / desire:

    1. Bulk needs an execution policy to describe the callable, IF the scheduler also has an execution policy (for debugging for example) then a conservative choice should be used (seq is more conservative than par)

    2. No SG1 concerns with the proposed exception handling

    3. No change to status quo on default implementation of bulk being serial

    4. No change to status quo on bulk having a predecessor

    5. Forms of bulk needed:

      • bulk -> f(i) -> bulk_chunked

      • bulk_chunked -> f(b, e)

      • bulk_unchunked -> f(i) "execution agent per iteration"

    | SF | F | N | A | SA |
    | 5  | 2 | 1 | 0 | 0  |
    

    Unanimous consent

References

Informative References

[ALG.FOREACH]
ISO WG21. Working Draft: Programming Languages — C++ -- For each. URL: https://eel.is/c++draft/alg.foreach
[EXEC.BULK]
ISO WG21. Working Draft: Programming Languages — C++ -- `execution::bulk`. URL: https://eel.is/c++draft/exec.bulk
[P0333R0]
Bryce Lelbach. Improving Parallel Algorithm Exception Handling. 15 May 2016. URL: https://wg21.link/p0333r0
[P2300R10]
Eric Niebler, Michał Dominiak, Georgy Evtushenko, Lewis Baker, Lucian Radu Teodorescu, Lee Howes, Kirk Shoop, Michael Garland, Bryce Adelstein Lelbach. `std::execution`. 28 June 2024. URL: https://wg21.link/p2300r10
[P2500R2]
Ruslan Arutyunyan, Alexey Kukanov. C++ parallel algorithms and P2300. 15 October 2023. URL: https://wg21.link/p2500r2
[P3481R0]
Lucian Radu Teodorescu, Lewis Baker, Ruslan Arutyunyan. Summarizing std::execution::bulk() issues. 16 October 2024. URL: https://wg21.link/p3481r0
[PARALLEL_FOR]
Intel. Intel® oneAPI Threading Building Blocks Developer Guide and API Reference -- parallel_for. URL: https://www.intel.com/content/www/us/en/docs/onetbb/developer-guide-api-reference/2021-6/parallel-for.html