Saturday, November 16, 2019

Digest for comp.programming.threads@googlegroups.com - 8 updates in 8 topics

Bonita Montero <Bonita.Montero@gmail.com>: Nov 16 07:36AM +0100

Here, Amine, that's a generic parallel merge-sort- as well as a parallel
quicksort-algorithm. The merge-sort-algorithm is faster than the Micro-
soft STL library stable_sort. But the quicksort is slower than the MS
STL's implemenation. So improve the latter to make it faster than the
MS-implementation! I don't know how to get it faster.
 
#include <algorithm>
#include <iterator>
#include <cstdlib>
#include <thread>
#include <functional>
#include <exception>
#include <mutex>
#include <memory>
#include <execution>
#include <array>
 
template<typename T>
struct invoke_on_destruct
{
private:
T &m_t;
bool m_enabled;
 
public:
invoke_on_destruct( T &t ) :
m_t( t ), m_enabled( true )
{
}
 
~invoke_on_destruct()
{
if( m_enabled )
m_t();
}
 
void invoke_and_disable()
{
m_t();
m_enabled = false;
}
};
 
template<typename It,
typename Cmp = std::less<typename
std::iterator_traits<It>::value_type>,
typename Allocator = std::allocator<typename
std::iterator_traits<It>::value_type>>
class merge_sort
{
private:
using ep_allocator = typename
std::allocator_traits<Allocator>::template rebind_alloc<std::exception_ptr>;
using exceptions_vector = std::vector<std::exception_ptr,
ep_allocator>;
 
public:
merge_sort( It start, It end, Cmp const &cmp = Cmp(), unsigned
nThreads = 0, Allocator const &alloc = Allocator() );
 
struct par_exception : public std::exception
{
using iterator = typename std::vector<std::exception_ptr,
ep_allocator>::iterator;
// there's no copy-constructor because of internal vector
// so catch par_exception only via reference
~par_exception();
iterator begin();
iterator end();
private:
friend class merge_sort;
par_exception( std::vector<std::exception_ptr, ep_allocator>
&&exceptions );
std::vector<std::exception_ptr, ep_allocator> m_exceptions;
};
 
private:
Cmp m_cmp;
Allocator m_alloc;
// mutex that protects the exception-array
std::mutex m_excMtx;
exceptions_vector m_exceptions;
template<typename UpIt>
void parRecursion( UpIt start, UpIt end, unsigned nThreads );
template<typename UpIt, typename BufIt>
void recursion( UpIt start, UpIt end, BufIt buf );
template<typename UpIt, typename BufIt>
void merge( UpIt start, BufIt leftBuf, BufIt rightBuf, BufIt bufEnd );
};
 
template<typename It, typename Cmp, typename Allocator>
merge_sort<It, Cmp, Allocator>::merge_sort( It start, It end, Cmp const
&cmp, unsigned nThreads, Allocator const &alloc ) :
m_cmp( cmp ),
m_alloc( alloc ),
m_exceptions( ep_allocator( alloc ) )
{
using namespace std;
// threads == 0 -> number of threads = hardware-threads
unsigned hwThreads = thread::hardware_concurrency();
hwThreads += hwThreads == 0;
nThreads = nThreads == 0 ? hwThreads : nThreads <= hwThreads ?
nThreads : hwThreads;
// reserve number of threads elements in the exception_ptr-vector
// so that there will be no exception when we do emplace_back
m_exceptions.reserve( nThreads );
try
{
parRecursion( start, end, nThreads );
if( m_exceptions.size() )
if( m_exceptions.size() > 1 )
// multiple exceptions from threads: throw par_exception
throw par_exception( move( m_exceptions ) );
else
// only one exception from threads: rethrow it
rethrow_exception( m_exceptions[0] );
}
catch( ... )
{
if( m_exceptions.size() )
{
// additional exception catched: throw par_exception
m_exceptions.emplace_back( current_exception() );
throw par_exception( move( m_exceptions ) );
}
else
// single exception: rethrow it
throw;
}
}
 
template<typename It, typename Cmp, typename Allocator>
template<typename UpIt, typename BufIt>
void merge_sort<It, Cmp, Allocator>::recursion( UpIt start, UpIt end,
BufIt buf )
{
using namespace std;
if( end - start <= 1 )
return;
copy( start, end, buf );
size_t n = end - start;
BufIt leftBuf = buf,
leftBufEnd = buf + n / 2,
rightBuf = leftBufEnd,
bufEnd = buf + n;
recursion( leftBuf, leftBufEnd, bufEnd );
recursion( rightBuf, bufEnd, bufEnd );
merge( start, leftBuf, rightBuf, bufEnd );
}
 
template<typename It, typename Cmp, typename Allocator>
template<typename UpIt, typename BufIt>
void merge_sort<It, Cmp, Allocator>::merge( UpIt start, BufIt leftBuf,
BufIt rightBuf, BufIt bufEnd )
{
BufIt leftBufEnd = rightBuf;
for( UpIt wrtBack = start; ; )
if( m_cmp( *leftBuf, *rightBuf ) )
{
*wrtBack++ = *leftBuf;
if( ++leftBuf == leftBufEnd )
{
// faster for small number of elements than std::copy
do
*wrtBack++ = *rightBuf;
while( ++rightBuf != bufEnd );
break;
}
}
else
{
*wrtBack++ = *rightBuf;
if( ++rightBuf == bufEnd )
{
do
*wrtBack++ = *leftBuf;
while( ++leftBuf != leftBufEnd );
break;
}
}
}
 
template<typename It, typename Cmp, typename Allocator>
template<typename UpIt>
void merge_sort<It, Cmp, Allocator>::parRecursion( UpIt start, UpIt end,
unsigned nThreads )
{
using namespace std;
using T = typename iterator_traits<It>::value_type;
size_t n = end - start;
if( nThreads <= 1 )
{
vector<T, Allocator> buf( m_alloc );;
size_t bs = 0;
// calculate buffer-/stack-size
for( size_t split = end - start; split > 1; bs += split, split
-= split / 2 );
buf.resize( bs );
recursion( start, end, buf.begin() );
}
else
{
// split-buffer
vector<T, Allocator> buf( m_alloc );;
buf.resize( n );
copy( start, end, buf.begin() );
// array for left-recursion and right-recursion thread
array<thread, 2> threads;
// automatically join threads when an exception is thrown
auto joinThreads = [&threads]()
{
for( thread &thr : threads )
// try to join infinitely because the thread
// will continue to access our buffer
if( thr.get_id() != thread::id() )
for( ; ; )
try
{
thr.join();
break;
}
catch( ... )
{
}
};
invoke_on_destruct<decltype(joinThreads)> iodJoin( joinThreads );
// iterator-type for our split-buffer
using BufIt = typename vector<T, Allocator>::iterator;
// proxy thread-lambda for our new threads
auto prProxy = [this]( BufIt start, BufIt end, unsigned nThreads )
{
try
{
parRecursion( start, end, nThreads );
}
catch( ... )
{
// remember exception in our exception-array
unique_lock<mutex> excLock( m_excMtx );
m_exceptions.emplace_back( current_exception() );
}
};
unsigned rightThreads = nThreads / 2,
leftThreads = nThreads - rightThreads;
// if the left number of threads is uneven give the threads
more input
size_t left = (size_t)(n * ((double)leftThreads /
nThreads)),
right = n - left;
BufIt leftBuf = buf.begin(),
leftBufEnd = buf.begin() + left,
rightBuf = leftBufEnd,
bufEnd = buf.begin() + n;
// start left thread
threads[0] = move( thread( prProxy, leftBuf, leftBufEnd,
leftThreads ) );
if( rightThreads > 1 )
// start right thread
threads[1] = move( thread( prProxy, rightBuf, bufEnd,
rightThreads ) );
else
// there's only one thread right, so we do it on our own
parRecursion( rightBuf, bufEnd, 1 );
// join threads
iodJoin.invoke_and_disable();
// stop if there are any exceptions from the threads
unique_lock<mutex> excLock( m_excMtx );
if( m_exceptions.size() )
return;
excLock.unlock();
// merge split-buffer back into input-buffer
merge( start, leftBuf, rightBuf, bufEnd );
}
}
 
template<typename It, typename Cmp, typename Allocator>
merge_sort<It, Cmp, Allocator>::par_exception::~par_exception()
{
}
 
template<typename It, typename Cmp, typename Allocator>
merge_sort<It, Cmp, Allocator>::par_exception::par_exception( typename
merge_sort<It, Cmp, Allocator>::exceptions_vector &&exceptions ) :
m_exceptions( std::move( exceptions ) )
{
}
 
template<typename It, typename Cmp, typename Allocator>
typename merge_sort<It, Cmp, Allocator>::par_exception::iterator
merge_sort<It, Cmp, Allocator>::par_exception::begin()
{
return m_exceptions.begin();
}
 
template<typename It, typename Cmp, typename Allocator>
typename merge_sort<It, Cmp, Allocator>::par_exception::iterator
merge_sort<It, Cmp, Allocator>::par_exception::end()
{
return m_exceptions.end();
}
 
#if defined(_MSC_VER)
#pragma warning(disable: 26444)

No comments: