parallelfuture
A library that implements various high-level parallelism primitives, such as
parallel foreach over arbitrary ranges, parallel map and reduce,
futures, and a task pool.
By:
David Simcha
Examples:
// Create a TaskPool object with the default number of threads.
auto poolInstance = new TaskPool();
// Create some data to work on.
uint[] numbers = new uint[1_000];
// Fill in this array in parallel, using default block size.
// Note: Be careful when writing to adjacent elements of an array from
// different threads, as this can cause word tearing bugs when
// the elements aren't properly aligned or aren't the machine's native
// word size. In this case, though, we're ok.
foreach(i; poolInstance.parallel( iota(0, numbers.length)) ) {
numbers[i] = i;
}
// Make sure it works.
foreach(i; 0..numbers.length) {
assert(numbers[i] == i);
}
stderr.writeln("Done creating nums.");
// Parallel foreach also works on non-random access ranges, albeit
// less efficiently.
auto myNumbers = filter!"a % 7 > 0"( iota(0, 1000));
foreach(num; poolInstance.parallel(myNumbers)) {
assert(num % 7 > 0 && num < 1000);
}
stderr.writeln("Done modulus test.");
// Use parallel map to calculate the square of each element in numbers,
// and make sure it's right.
uint[] squares = poolInstance.map!"a * a"(numbers, 100);
assert(squares.length == numbers.length);
foreach(i, number; numbers) {
assert(squares[i] == number * number);
}
stderr.writeln("Done squares.");
// Sum up the array in parallel with the current thread.
auto sumFuture = poolInstance.task!( reduce!"a + b" )(numbers);
// Go off and do other stuff while that future executes:
// Find the sum of squares of numbers.
ulong sumSquares = 0;
foreach(elem; numbers) {
sumSquares += elem * elem;
}
// Ask for our result. If the pool has not yet started working on
// this task, spinWait() automatically steals it and executes it in this
// thread.
uint mySum = sumFuture.spinWait();
assert(mySum == 999 * 1000 / 2);
// We could have also computed this sum in parallel using parallel
// reduce.
auto mySumParallel = poolInstance.reduce!"a + b"(numbers);
assert(mySum == mySumParallel);
stderr.writeln("Done sums.");
// Execute an anonymous delegate as a task.
auto myTask = poolInstance.task({
synchronized writeln("Our lives are parallel...Our lives are parallel.");
});
// Parallel foreach loops can also be nested:
auto nestedOuter = iota('a', cast(char) ('d' + 1));
auto nestedInner = iota(0, 5);
foreach(letter; poolInstance.parallel(nestedOuter, 1)) {
foreach(number; poolInstance.parallel(nestedInner, 1)) {
synchronized writeln(cast(char) letter, number);
}
}
// Block until all jobs are finished and then shut down the thread pool.
poolInstance.waitStop();
License:
Boost Software License - Version 1.0 - August 17th, 2003
Permission is hereby granted, free of charge, to any person or organization
obtaining a copy of the software and accompanying documentation covered by
this license (the "Software") to use, reproduce, display, distribute,
execute, and transmit the Software, and to prepare derivative works of the
Software, and to permit third-parties to whom the Software is furnished to
do so, all subject to the following:
The copyright notices in the Software and this entire statement, including
the above license grant, this restriction and the following disclaimer,
must be included in all copies of the Software, in whole or in part, and
all derivative works of the Software, unless such copies or derivative
works are solely in the form of machine-executable object code generated by
a source language processor.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
- ReturnType!(F)
runCallable
(F, Args...)(F fpOrDelegate, Args args);
- Calls a delegate or function pointer with args. This is basically an
adapter that makes Task work with delegates, function pointers and
functors instead of just aliases.
- struct
Task
(alias fun,Args...);
- A struct that encapsulates the information about a task, including
its current status, what pool it was submitted to, and its arguments.
Note:
If a
Task
has been submitted to the pool, is being stored in a stack
frame, and has not yet finished, the destructor for this struct will
automatically call yieldWait() so that the task can finish and the
stack frame can be destroyed safely.
- ReturnType
spinWait
();
- If the task isn't started yet, steal it and do it in this thread.
If it's done, return its return value, if any. If it's in progress,
busy spin until it's done, then return the return value.
This function should be used when you expect the result of the
task to be available relatively quickly, on a timescale shorter
than that of an OS context switch.
- ReturnType
yieldWait
();
- If the task isn't started yet, steal it and do it in this thread.
If it's done, return its return value, if any. If it's in progress,
wait on a condition variable.
This function should be used when you expect the result of the
task to take a while, as waiting on a condition variable
introduces latency, but results in negligible wasted CPU cycles.
- ReturnType
workWait
();
- If this task is not started yet, steal it and execute it in the current
thread. If it is finished, return its result. If it is in progress,
execute any other available tasks from the task pool until this one
is finished. If no other tasks are available, yield wait.
- bool
done
();
- Task!(fun,Args)
task
(alias fun, Args...)(Args args);
- Creates a
task
that calls an alias.
Examples:
auto pool = new TaskPool();
uint[] foo = someFunction();
// Create a task to sum this array in the background.
auto myTask = task!( reduce!"a + b" )(foo);
pool.put(myTask);
// Do other stuff.
// Get value. Steals the job and executes it in this thread if it
// hasn't been started by a worker thread yet.
writeln("Sum = ", myFuture.spinWait());
Note:
This method of creating tasks allocates on the stack and requires an explicit
submission to the pool. It is designed for tasks that are to finish before
the function in which they are created returns. If you want to escape the
Task object from the function in which it was created or prefer to heap
allocate and automatically submit to the pool, see pool.
task
().
- Task!(runCallable,TypeTuple!(F,Args))
task
(F, Args...)(scope F delegateOrFp, Args args);
- Create a
task
that calls a function pointer, delegate, or functor.
This works for anonymous delegates.
Examples:
auto pool = new TaskPool();
auto myTask = task({
stderr.writeln("I've completed a task.");
});
pool.put(myTask);
// Do other stuff.
myTask.yieldWait();
Notes:
This method of creating tasks allocates on the stack and requires an explicit
submission to the pool. It is designed for tasks that are to finish before
the function in which they are created returns. If you want to escape the
Task object from the function in which it was created or prefer to heap
allocate and automatically submit to the pool, see pool.
task
().
In the case of delegates, this function takes a scope delegate to prevent
the allocation of closures, since its intended use is for tasks that will
be finished before the function in which they're created returns.
pool.
task
() takes a non-scope delegate and will allow the use of closures.
- struct
WorkerLocal
(T);
- Struct for creating worker-local storage. Worker-local storage is basically
thread-local storage that exists only for workers in a given pool, is
allocated on the heap in a way that avoids false sharing,
and doesn't necessarily have global scope within any
thread. It can be accessed from any worker thread in the pool that created
it, or the thread that created the pool that created it. Accessing from
other threads will result in undefined behavior.
Since the underlying data for this struct is heap-allocated, this struct
has reference semantics when passed around.
At a more concrete level, the main use case for
WorkerLocal
is performing
parallel reductions with an imperative, as opposed to functional,
programming style. Therefore, it's useful to treat
WorkerLocal
as local
to each thread for only the parallel portion of an algorithm.
Examples:
auto pool = new TaskPool;
auto sumParts = pool.createWorkerLocal!(uint)();
foreach(i; pool.parallel(iota(0U, someLargeNumber))) {
// Do complicated stuff.
sumParts.get += resultOfComplicatedStuff;
}
writeln("Sum = ", reduce!"a + b"(sumParts.toRange));
- @property T
get
();
- Get the current thread's instance. Returns by reference even though
ddoc refuses to say so. Undefined behavior will result if the current
thread is not a worker in the pool that created this instance or the
thread that created the pool that created this instance.
If assertions are enabled and toRange() has been called, then this
WorkerLocal instance is no longer worker-local and an assertion
failure will result when calling this method. This is not checked
when assertions are disabled for performance reasons.
- @property void
get
(T val);
- Assign a value to the current thread's instance. This function has
the same caveats as its overload.
- @property WorkerLocalRange!(T)
toRange
();
- Returns a range view of the values for all threads, which can be used
to do stuff with the results of each thread after running the parallel
part of your algorithm. Do NOT use this method in the parallel portion
of your algorithm.
Calling this function will also set a flag indicating
that this struct is no longer thread-local, and attempting to use the
get() method again will result in an assertion failure if assertions
are enabled.
- struct
WorkerLocalRange
(T);
- Range primitives for worker-local storage. The purpose of this is to
access results produced by each worker thread from a single thread once you
are no longer using the worker-local storage from multiple threads.
Do NOT use this struct in the parallel portion of your algorithm.
- @property T
front
();
- @property T
back
();
- void
popFront
();
- void
popBack
();
- @property typeof(this)
save
();
- T
opIndex
(size_t index);
- void
opIndexAssign
(T val, size_t index);
- @property bool
empty
();
- @property size_t
length
();
- class
TaskPool
;
- The task pool class that is the workhorse of this library.
- this();
- Default constructor that initializes a TaskPool with
however many cores are on your CPU, minus 1 because the thread
that initialized the pool will also do work.
Note:
Initializing a pool with zero threads (as would happen in the
case of a single-core CPU) is well-tested and does work.
- this(size_t poolSize);
- Allows for custom poolSize.
- final size_t
workerIndex
();
- Gets the index of the current thread relative to this pool. The thread
that created this pool receives an index of 0. The worker threads in
this pool receive indices of 1 through poolSize.
The worker index is useful mainly for maintaining worker-local storate.
BUGS:
Subject to integer overflow errors if more than size_t.max threads
are ever created during the course of a program's execution. This
will likely never be fixed because it's an extreme corner case
on 32-bit and it's completely implausible on 64-bit.
Will silently return undefined results for threads that are not
workers in this pool and did not create this pool.
- WorkerLocal!(T)
createWorkerLocal
(T)(lazy T initialVal = T.init);
- Create an instance of worker-local storage, initialized with a given
value. The value is lazy so that you can, for example, easily
create one instance of a class for each worker.
- void
stop
();
- Kills pool immediately w/o waiting for jobs to finish. Use only if you
have waitied on every job and therefore know there can't possibly be more
in queue, or if you speculatively executed a bunch of stuff and realized
you don't need those results anymore.
Note:
Does not affect jobs that are already executing, only those
in queue.
- void
waitStop
();
- Waits for all jobs to finish, then shuts down the pool.
- void
finish
();
- Instructs worker threads to stop when the queue becomes empty, but does
not block.
- final uint
size
();
- Returns the number of worker threads in the pool.
- void
put
(alias fun, Args...)(ref Task!(fun,Args) task);
- Put a task on the queue.
Note:
While this function takes the address of variables that may
potentially be on the stack, it is safe and will be marked as @trusted
once SafeD is fully implemented. Task objects include a destructor that
waits for the task to complete before destroying the stack frame that
they are allocated on. Therefore, it is impossible for the stack
frame to be destroyed before the task is complete and out of the queue.
- Task!(fun,Args)*
task
(alias fun, Args...)(Args args);
- Convenience method that automatically creates a Task calling an alias on
the GC heap and submits it to the pool. See examples for the
non-member function
task
().
Returns:
A pointer to the Task object.
- Task!(runCallable,TypeTuple!(F,Args))*
task
(F, Args...)(F delegateOrFp, Args args);
- Convenience method that automatically creates a Task calling a delegate,
function pointer, or functor on the GC heap and submits it to the pool.
See examples for the non-member function
task
().
Returns:
A pointer to the Task object.
Note:
This function takes a non-scope delegate, meaning it can be
used with closures. If you can't allocate a closure due to objects
on the stack that have scoped destruction, see the global function
task
(), which takes a scope delegate.
- ParallelForeach!(R)
parallel
(R)(R range, size_t blockSize);
- Implements a
parallel
foreach loop over a range. blockSize is the
number of elements to process in one work unit.
Examples:
auto pool = new TaskPool();
uint[] squares = new uint[1_000];
foreach(i; pool.parallel( iota(0, foo.length), 100)) {
// Iterate over foo using work units of size 100.
squares[i] = i * i;
}
Note:
Since breaking from a loop that's being executed in
parallel
doesn't make much sense, it is considered undefined behavior in this
implementation of
parallel
foreach.
- ParallelForeach!(R)
parallel
(R)(R range);
- Parallel foreach with default block size. For ranges that don't have
a length, the default is 512 elements. For ranges that do, the default
is whatever number would create exactly twice as many work units as
we have worker threads.
- MapType!(fun,R)[]
map
(alias fun, R, I : size_t)(R range, I blockSize, MapType!(fun,R)[] buf = null);
- Parallel
map
. Unlike std.algorithm.
map
, this is evaluated eagerly
because it wouldn't make sense to evaluate a parallel
map
lazily.
fun is the function to be evaluated, range is the range to evaluate
this function on. range must be a random access range with length
for now, though this restriction may be lifted in the future.
blockSize is the size of the work unit to submit to the thread pool,
in elements. buf is an optional buffer to store the results in.
If none is provided, one is allocated. If one is provided, it
must have the same length as range.
Examples:
auto pool = new TaskPool();
real[] numbers = new real[1_000];
foreach(i, ref num; numbers) {
num = i;
}
// Find the squares of numbers[] using work units of size 100.
real[] squares = pool.map!"a * a"(numbers, 100);
- MapType!(fun,R)[]
map
(alias fun, R)(R range, MapType!(fun,R)[] buf = null);
- Parallel
map
with default block size.
- ReduceType!(fun,R,E)
reduce
(alias fun, R, E)(E startVal, R range, size_t blockSize = 0);
- Parallel
reduce
. For now, the range must offer random access and have
a length. In the future, this restriction may be lifted.
Note:
Because this operation is being carried out in parallel,
fun must be associative. For notational simplicity, let # be an
infix operator representing fun. Then, (a # b) # c must equal
a # (b # c). This is NOT the same thing as commutativity. Matrix
multiplication, for example, is associative but not commutative.
Examples:
// Find the max of an array in parallel. Note that this is a toy example
// and unless the comparison function was very expensive, it would
// almost always be faster to do this in serial.
auto pool = new TaskPool();
auto myArr = somethingExpensiveToCompare();
auto myMax = pool.reduce!max(myArr);
- ReduceType!(fun,R,ElementType!(R))
reduce
(alias fun, R)(R range, size_t blockSize = 0);
- Parallel
reduce
with the first element of the range as the start value.
|