parallelfuture

A library that implements various high-level parallelism primitives, such as parallel foreach over arbitrary ranges, parallel map and reduce, futures, and a task pool.

By:
David Simcha

Examples:
    // Create a TaskPool object with the default number of threads.
    auto poolInstance = new TaskPool();

    // Create some data to work on.
    uint[] numbers = new uint[1_000];

    // Fill in this array in parallel, using default block size.
    // Note:  Be careful when writing to adjacent elements of an array from
    // different threads, as this can cause word tearing bugs when
    // the elements aren't properly aligned or aren't the machine's native
    // word size.  In this case, though, we're ok.
    foreach(i; poolInstance.parallel( iota(0, numbers.length)) ) {
        numbers[i] = i;
    }

    // Make sure it works.
    foreach(i; 0..numbers.length) {
        assert(numbers[i] == i);
    }

    stderr.writeln("Done creating nums.");

    // Parallel foreach also works on non-random access ranges, albeit
    // less efficiently.
    auto myNumbers = filter!"a % 7 > 0"( iota(0, 1000));
    foreach(num; poolInstance.parallel(myNumbers)) {
        assert(num % 7 > 0 && num < 1000);
    }
    stderr.writeln("Done modulus test.");

    // Use parallel map to calculate the square of each element in numbers,
    // and make sure it's right.
    uint[] squares = poolInstance.map!"a * a"(numbers, 100);
    assert(squares.length == numbers.length);
    foreach(i, number; numbers) {
        assert(squares[i] == number * number);
    }
    stderr.writeln("Done squares.");

    // Sum up the array in parallel with the current thread.
    auto sumFuture = poolInstance.task!( reduce!"a + b" )(numbers);

    // Go off and do other stuff while that future executes:
    // Find the sum of squares of numbers.
    ulong sumSquares = 0;
    foreach(elem; numbers) {
        sumSquares += elem * elem;
    }

    // Ask for our result.  If the pool has not yet started working on
    // this task, spinWait() automatically steals it and executes it in this
    // thread.
    uint mySum = sumFuture.spinWait();
    assert(mySum == 999 * 1000 / 2);

    // We could have also computed this sum in parallel using parallel
    // reduce.
    auto mySumParallel = poolInstance.reduce!"a + b"(numbers);
    assert(mySum == mySumParallel);
    stderr.writeln("Done sums.");

    // Execute an anonymous delegate as a task.
    auto myTask = poolInstance.task({
        synchronized writeln("Our lives are parallel...Our lives are parallel.");
    });

    // Parallel foreach loops can also be nested:
    auto nestedOuter = iota('a', cast(char) ('d' + 1));
    auto nestedInner =  iota(0, 5);

    foreach(letter; poolInstance.parallel(nestedOuter, 1)) {
        foreach(number; poolInstance.parallel(nestedInner, 1)) {
            synchronized writeln(cast(char) letter, number);
        }
    }

    // Block until all jobs are finished and then shut down the thread pool.
    poolInstance.waitStop();


License:
Boost Software License - Version 1.0 - August 17th, 2003

Permission is hereby granted, free of charge, to any person or organization obtaining a copy of the software and accompanying documentation covered by this license (the "Software") to use, reproduce, display, distribute, execute, and transmit the Software, and to prepare derivative works of the Software, and to permit third-parties to whom the Software is furnished to do so, all subject to the following:

The copyright notices in the Software and this entire statement, including the above license grant, this restriction and the following disclaimer, must be included in all copies of the Software, in whole or in part, and all derivative works of the Software, unless such copies or derivative works are solely in the form of machine-executable object code generated by a source language processor.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

ReturnType!(F) runCallable (F, Args...)(F fpOrDelegate, Args args);
Calls a delegate or function pointer with args. This is basically an adapter that makes Task work with delegates, function pointers and functors instead of just aliases.

struct Task (alias fun,Args...);
A struct that encapsulates the information about a task, including its current status, what pool it was submitted to, and its arguments.

Note:
If a Task has been submitted to the pool, is being stored in a stack frame, and has not yet finished, the destructor for this struct will automatically call yieldWait() so that the task can finish and the stack frame can be destroyed safely.

ReturnType spinWait ();
If the task isn't started yet, steal it and do it in this thread. If it's done, return its return value, if any. If it's in progress, busy spin until it's done, then return the return value.

This function should be used when you expect the result of the task to be available relatively quickly, on a timescale shorter than that of an OS context switch.

ReturnType yieldWait ();
If the task isn't started yet, steal it and do it in this thread. If it's done, return its return value, if any. If it's in progress, wait on a condition variable.

This function should be used when you expect the result of the task to take a while, as waiting on a condition variable introduces latency, but results in negligible wasted CPU cycles.

ReturnType workWait ();
If this task is not started yet, steal it and execute it in the current thread. If it is finished, return its result. If it is in progress, execute any other available tasks from the task pool until this one is finished. If no other tasks are available, yield wait.

bool done ();


Task!(fun,Args) task (alias fun, Args...)(Args args);
Creates a task that calls an alias.

Examples:
 auto pool = new TaskPool();
 uint[] foo = someFunction();

 // Create a task to sum this array in the background.
 auto myTask = task!( reduce!"a + b" )(foo);
 pool.put(myTask);

 // Do other stuff.

 // Get value.  Steals the job and executes it in this thread if it
 // hasn't been started by a worker thread yet.
 writeln("Sum = ", myFuture.spinWait());


Note:
This method of creating tasks allocates on the stack and requires an explicit submission to the pool. It is designed for tasks that are to finish before the function in which they are created returns. If you want to escape the Task object from the function in which it was created or prefer to heap allocate and automatically submit to the pool, see pool. task ().

Task!(runCallable,TypeTuple!(F,Args)) task (F, Args...)(scope F delegateOrFp, Args args);
Create a task that calls a function pointer, delegate, or functor. This works for anonymous delegates.

Examples:
 auto pool = new TaskPool();
 auto myTask = task({
     stderr.writeln("I've completed a task.");
 });

 pool.put(myTask);

 // Do other stuff.

 myTask.yieldWait();


Notes:
This method of creating tasks allocates on the stack and requires an explicit submission to the pool. It is designed for tasks that are to finish before the function in which they are created returns. If you want to escape the Task object from the function in which it was created or prefer to heap allocate and automatically submit to the pool, see pool. task ().

In the case of delegates, this function takes a scope delegate to prevent the allocation of closures, since its intended use is for tasks that will be finished before the function in which they're created returns. pool. task () takes a non-scope delegate and will allow the use of closures.

struct WorkerLocal (T);
Struct for creating worker-local storage. Worker-local storage is basically thread-local storage that exists only for workers in a given pool, is allocated on the heap in a way that avoids false sharing, and doesn't necessarily have global scope within any thread. It can be accessed from any worker thread in the pool that created it, or the thread that created the pool that created it. Accessing from other threads will result in undefined behavior.

Since the underlying data for this struct is heap-allocated, this struct has reference semantics when passed around.

At a more concrete level, the main use case for WorkerLocal is performing parallel reductions with an imperative, as opposed to functional, programming style. Therefore, it's useful to treat WorkerLocal as local to each thread for only the parallel portion of an algorithm.

Examples:
 auto pool = new TaskPool;
 auto sumParts = pool.createWorkerLocal!(uint)();
 foreach(i; pool.parallel(iota(0U, someLargeNumber))) {
     // Do complicated stuff.
     sumParts.get += resultOfComplicatedStuff;
 }

 writeln("Sum = ", reduce!"a + b"(sumParts.toRange));


@property T get ();
Get the current thread's instance. Returns by reference even though ddoc refuses to say so. Undefined behavior will result if the current thread is not a worker in the pool that created this instance or the thread that created the pool that created this instance.

If assertions are enabled and toRange() has been called, then this WorkerLocal instance is no longer worker-local and an assertion failure will result when calling this method. This is not checked when assertions are disabled for performance reasons.

@property void get (T val);
Assign a value to the current thread's instance. This function has the same caveats as its overload.

@property WorkerLocalRange!(T) toRange ();
Returns a range view of the values for all threads, which can be used to do stuff with the results of each thread after running the parallel part of your algorithm. Do NOT use this method in the parallel portion of your algorithm.

Calling this function will also set a flag indicating that this struct is no longer thread-local, and attempting to use the get() method again will result in an assertion failure if assertions are enabled.

struct WorkerLocalRange (T);
Range primitives for worker-local storage. The purpose of this is to access results produced by each worker thread from a single thread once you are no longer using the worker-local storage from multiple threads. Do NOT use this struct in the parallel portion of your algorithm.

@property T front ();


@property T back ();


void popFront ();


void popBack ();


@property typeof(this) save ();


T opIndex (size_t index);


void opIndexAssign (T val, size_t index);


@property bool empty ();


@property size_t length ();


class TaskPool ;
The task pool class that is the workhorse of this library.

this();
Default constructor that initializes a TaskPool with however many cores are on your CPU, minus 1 because the thread that initialized the pool will also do work.

Note:
Initializing a pool with zero threads (as would happen in the case of a single-core CPU) is well-tested and does work.

this(size_t poolSize);
Allows for custom poolSize.

final size_t workerIndex ();
Gets the index of the current thread relative to this pool. The thread that created this pool receives an index of 0. The worker threads in this pool receive indices of 1 through poolSize.

The worker index is useful mainly for maintaining worker-local storate.

BUGS:
Subject to integer overflow errors if more than size_t.max threads are ever created during the course of a program's execution. This will likely never be fixed because it's an extreme corner case on 32-bit and it's completely implausible on 64-bit.

Will silently return undefined results for threads that are not workers in this pool and did not create this pool.

WorkerLocal!(T) createWorkerLocal (T)(lazy T initialVal = T.init);
Create an instance of worker-local storage, initialized with a given value. The value is lazy so that you can, for example, easily create one instance of a class for each worker.

void stop ();
Kills pool immediately w/o waiting for jobs to finish. Use only if you have waitied on every job and therefore know there can't possibly be more in queue, or if you speculatively executed a bunch of stuff and realized you don't need those results anymore.

Note:
Does not affect jobs that are already executing, only those in queue.

void waitStop ();
Waits for all jobs to finish, then shuts down the pool.

void finish ();
Instructs worker threads to stop when the queue becomes empty, but does not block.

final uint size ();
Returns the number of worker threads in the pool.

void put (alias fun, Args...)(ref Task!(fun,Args) task);
Put a task on the queue.

Note:
While this function takes the address of variables that may potentially be on the stack, it is safe and will be marked as @trusted once SafeD is fully implemented. Task objects include a destructor that waits for the task to complete before destroying the stack frame that they are allocated on. Therefore, it is impossible for the stack frame to be destroyed before the task is complete and out of the queue.

Task!(fun,Args)* task (alias fun, Args...)(Args args);
Convenience method that automatically creates a Task calling an alias on the GC heap and submits it to the pool. See examples for the non-member function task ().

Returns:
A pointer to the Task object.

Task!(runCallable,TypeTuple!(F,Args))* task (F, Args...)(F delegateOrFp, Args args);
Convenience method that automatically creates a Task calling a delegate, function pointer, or functor on the GC heap and submits it to the pool. See examples for the non-member function task ().

Returns:
A pointer to the Task object.

Note:
This function takes a non-scope delegate, meaning it can be used with closures. If you can't allocate a closure due to objects on the stack that have scoped destruction, see the global function task (), which takes a scope delegate.

ParallelForeach!(R) parallel (R)(R range, size_t blockSize);
Implements a parallel foreach loop over a range. blockSize is the number of elements to process in one work unit.

Examples:
 auto pool = new TaskPool();

 uint[] squares = new uint[1_000];
 foreach(i; pool.parallel( iota(0, foo.length), 100)) {
     // Iterate over foo using work units of size 100.
     squares[i] = i * i;
 }


Note:
Since breaking from a loop that's being executed in parallel doesn't make much sense, it is considered undefined behavior in this implementation of parallel foreach.

ParallelForeach!(R) parallel (R)(R range);
Parallel foreach with default block size. For ranges that don't have a length, the default is 512 elements. For ranges that do, the default is whatever number would create exactly twice as many work units as we have worker threads.

MapType!(fun,R)[] map (alias fun, R, I : size_t)(R range, I blockSize, MapType!(fun,R)[] buf = null);
Parallel map . Unlike std.algorithm. map , this is evaluated eagerly because it wouldn't make sense to evaluate a parallel map lazily.

fun is the function to be evaluated, range is the range to evaluate this function on. range must be a random access range with length for now, though this restriction may be lifted in the future. blockSize is the size of the work unit to submit to the thread pool, in elements. buf is an optional buffer to store the results in. If none is provided, one is allocated. If one is provided, it must have the same length as range.

Examples:
 auto pool = new TaskPool();

 real[] numbers = new real[1_000];
 foreach(i, ref num; numbers) {
     num = i;
 }

 // Find the squares of numbers[] using work units of size 100.
 real[] squares = pool.map!"a * a"(numbers, 100);


MapType!(fun,R)[] map (alias fun, R)(R range, MapType!(fun,R)[] buf = null);
Parallel map with default block size.

ReduceType!(fun,R,E) reduce (alias fun, R, E)(E startVal, R range, size_t blockSize = 0);
Parallel reduce . For now, the range must offer random access and have a length. In the future, this restriction may be lifted.

Note:
Because this operation is being carried out in parallel, fun must be associative. For notational simplicity, let # be an infix operator representing fun. Then, (a # b) # c must equal a # (b # c). This is NOT the same thing as commutativity. Matrix multiplication, for example, is associative but not commutative.

Examples:
 // Find the max of an array in parallel.  Note that this is a toy example
 // and unless the comparison function was very expensive, it would
 // almost always be faster to do this in serial.

 auto pool = new TaskPool();

 auto myArr = somethingExpensiveToCompare();
 auto myMax = pool.reduce!max(myArr);


ReduceType!(fun,R,ElementType!(R)) reduce (alias fun, R)(R range, size_t blockSize = 0);
Parallel reduce with the first element of the range as the start value.

Page was generated with on Tue Jul 13 00:12:42 2010