- Task for Amine - 1 Update
- Look at my "sportive" spirit.. - 1 Update
- More about my way of doing.. - 1 Update
- Read the following part of a somewhat old book of O'Reilly - 1 Update
- More political philosophy about morality.. - 1 Update
- I will explain more my poem of: "You're In My Heart" - 1 Update
- My powerful Parallel Compression Library was updated to version 4.4 - 1 Update
- I will soon enhance much more my following powerful Parallel Sort Library - 1 Update
| Bonita Montero <Bonita.Montero@gmail.com>: Nov 16 07:36AM +0100 Here, Amine, that's a generic parallel merge-sort- as well as a parallel quicksort-algorithm. The merge-sort-algorithm is faster than the Micro- soft STL library stable_sort. But the quicksort is slower than the MS STL's implemenation. So improve the latter to make it faster than the MS-implementation! I don't know how to get it faster. #include <algorithm> #include <iterator> #include <cstdlib> #include <thread> #include <functional> #include <exception> #include <mutex> #include <memory> #include <execution> #include <array> template<typename T> struct invoke_on_destruct { private: T &m_t; bool m_enabled; public: invoke_on_destruct( T &t ) : m_t( t ), m_enabled( true ) { } ~invoke_on_destruct() { if( m_enabled ) m_t(); } void invoke_and_disable() { m_t(); m_enabled = false; } }; template<typename It, typename Cmp = std::less<typename std::iterator_traits<It>::value_type>, typename Allocator = std::allocator<typename std::iterator_traits<It>::value_type>> class merge_sort { private: using ep_allocator = typename std::allocator_traits<Allocator>::template rebind_alloc<std::exception_ptr>; using exceptions_vector = std::vector<std::exception_ptr, ep_allocator>; public: merge_sort( It start, It end, Cmp const &cmp = Cmp(), unsigned nThreads = 0, Allocator const &alloc = Allocator() ); struct par_exception : public std::exception { using iterator = typename std::vector<std::exception_ptr, ep_allocator>::iterator; // there's no copy-constructor because of internal vector // so catch par_exception only via reference ~par_exception(); iterator begin(); iterator end(); private: friend class merge_sort; par_exception( std::vector<std::exception_ptr, ep_allocator> &&exceptions ); std::vector<std::exception_ptr, ep_allocator> m_exceptions; }; private: Cmp m_cmp; Allocator m_alloc; // mutex that protects the exception-array std::mutex m_excMtx; exceptions_vector m_exceptions; template<typename UpIt> void parRecursion( UpIt start, UpIt end, unsigned nThreads ); template<typename UpIt, typename BufIt> void recursion( UpIt start, UpIt end, BufIt buf ); template<typename UpIt, typename BufIt> void merge( UpIt start, BufIt leftBuf, BufIt rightBuf, BufIt bufEnd ); }; template<typename It, typename Cmp, typename Allocator> merge_sort<It, Cmp, Allocator>::merge_sort( It start, It end, Cmp const &cmp, unsigned nThreads, Allocator const &alloc ) : m_cmp( cmp ), m_alloc( alloc ), m_exceptions( ep_allocator( alloc ) ) { using namespace std; // threads == 0 -> number of threads = hardware-threads unsigned hwThreads = thread::hardware_concurrency(); hwThreads += hwThreads == 0; nThreads = nThreads == 0 ? hwThreads : nThreads <= hwThreads ? nThreads : hwThreads; // reserve number of threads elements in the exception_ptr-vector // so that there will be no exception when we do emplace_back m_exceptions.reserve( nThreads ); try { parRecursion( start, end, nThreads ); if( m_exceptions.size() ) if( m_exceptions.size() > 1 ) // multiple exceptions from threads: throw par_exception throw par_exception( move( m_exceptions ) ); else // only one exception from threads: rethrow it rethrow_exception( m_exceptions[0] ); } catch( ... ) { if( m_exceptions.size() ) { // additional exception catched: throw par_exception m_exceptions.emplace_back( current_exception() ); throw par_exception( move( m_exceptions ) ); } else // single exception: rethrow it throw; } } template<typename It, typename Cmp, typename Allocator> template<typename UpIt, typename BufIt> void merge_sort<It, Cmp, Allocator>::recursion( UpIt start, UpIt end, BufIt buf ) { using namespace std; if( end - start <= 1 ) return; copy( start, end, buf ); size_t n = end - start; BufIt leftBuf = buf, leftBufEnd = buf + n / 2, rightBuf = leftBufEnd, bufEnd = buf + n; recursion( leftBuf, leftBufEnd, bufEnd ); recursion( rightBuf, bufEnd, bufEnd ); merge( start, leftBuf, rightBuf, bufEnd ); } template<typename It, typename Cmp, typename Allocator> template<typename UpIt, typename BufIt> void merge_sort<It, Cmp, Allocator>::merge( UpIt start, BufIt leftBuf, BufIt rightBuf, BufIt bufEnd ) { BufIt leftBufEnd = rightBuf; for( UpIt wrtBack = start; ; ) if( m_cmp( *leftBuf, *rightBuf ) ) { *wrtBack++ = *leftBuf; if( ++leftBuf == leftBufEnd ) { // faster for small number of elements than std::copy do *wrtBack++ = *rightBuf; while( ++rightBuf != bufEnd ); break; } } else { *wrtBack++ = *rightBuf; if( ++rightBuf == bufEnd ) { do *wrtBack++ = *leftBuf; while( ++leftBuf != leftBufEnd ); break; } } } template<typename It, typename Cmp, typename Allocator> template<typename UpIt> void merge_sort<It, Cmp, Allocator>::parRecursion( UpIt start, UpIt end, unsigned nThreads ) { using namespace std; using T = typename iterator_traits<It>::value_type; size_t n = end - start; if( nThreads <= 1 ) { vector<T, Allocator> buf( m_alloc );; size_t bs = 0; // calculate buffer-/stack-size for( size_t split = end - start; split > 1; bs += split, split -= split / 2 ); buf.resize( bs ); recursion( start, end, buf.begin() ); } else { // split-buffer vector<T, Allocator> buf( m_alloc );; buf.resize( n ); copy( start, end, buf.begin() ); // array for left-recursion and right-recursion thread array<thread, 2> threads; // automatically join threads when an exception is thrown auto joinThreads = [&threads]() { for( thread &thr : threads ) // try to join infinitely because the thread // will continue to access our buffer if( thr.get_id() != thread::id() ) for( ; ; ) try { thr.join(); break; } catch( ... ) { } }; invoke_on_destruct<decltype(joinThreads)> iodJoin( joinThreads ); // iterator-type for our split-buffer using BufIt = typename vector<T, Allocator>::iterator; // proxy thread-lambda for our new threads auto prProxy = [this]( BufIt start, BufIt end, unsigned nThreads ) { try { parRecursion( start, end, nThreads ); } catch( ... ) { // remember exception in our exception-array unique_lock<mutex> excLock( m_excMtx ); m_exceptions.emplace_back( current_exception() ); } }; unsigned rightThreads = nThreads / 2, leftThreads = nThreads - rightThreads; // if the left number of threads is uneven give the threads more input size_t left = (size_t)(n * ((double)leftThreads / nThreads)), right = n - left; BufIt leftBuf = buf.begin(), leftBufEnd = buf.begin() + left, rightBuf = leftBufEnd, bufEnd = buf.begin() + n; // start left thread threads[0] = move( thread( prProxy, leftBuf, leftBufEnd, leftThreads ) ); if( rightThreads > 1 ) // start right thread threads[1] = move( thread( prProxy, rightBuf, bufEnd, rightThreads ) ); else // there's only one thread right, so we do it on our own parRecursion( rightBuf, bufEnd, 1 ); // join threads iodJoin.invoke_and_disable(); // stop if there are any exceptions from the threads unique_lock<mutex> excLock( m_excMtx ); if( m_exceptions.size() ) return; excLock.unlock(); // merge split-buffer back into input-buffer merge( start, leftBuf, rightBuf, bufEnd ); } } template<typename It, typename Cmp, typename Allocator> merge_sort<It, Cmp, Allocator>::par_exception::~par_exception() { } template<typename It, typename Cmp, typename Allocator> merge_sort<It, Cmp, Allocator>::par_exception::par_exception( typename merge_sort<It, Cmp, Allocator>::exceptions_vector &&exceptions ) : m_exceptions( std::move( exceptions ) ) { } template<typename It, typename Cmp, typename Allocator> typename merge_sort<It, Cmp, Allocator>::par_exception::iterator merge_sort<It, Cmp, Allocator>::par_exception::begin() { return m_exceptions.begin(); } template<typename It, typename Cmp, typename Allocator> typename merge_sort<It, Cmp, Allocator>::par_exception::iterator merge_sort<It, Cmp, Allocator>::par_exception::end() { return m_exceptions.end(); } #if defined(_MSC_VER) #pragma warning(disable: 26444)
Subscribe to:
Post Comments (Atom)
|
No comments:
Post a Comment