Thursday, January 25, 2024

Digest for comp.lang.c++@googlegroups.com - 2 updates in 1 topic

Bonita Montero <Bonita.Montero@gmail.com>: Jan 25 06:25PM +0100

Once I've written a thread pool that has an upper limit of the number
threads and a timeout when idle threads end theirselfes. If you have
sth userpace CPU bound you'd specify the number of hardware-threads
as the upper limit, if you have much threads doing I/O you may go far
beyond since the hardware-threads aren't fully occupied anyway.
The problem with my initial thread pool class was that there may be
a large number of idle threads which could be used by other pools.
So I wrote a thread pool class where each pool has an upper limit of
the number of executing threads and there are no idle threads within
each pool. Instead the threads go idling in a global singleton pool
and attach to each pool which needs a new thread, thereby minimizing
the total number of threads.
 
This is the implementation
 
// header
 
#pragma once
#include <thread>
#include <mutex>
#include <condition_variable>
#include <deque>
#include <functional>
#include <chrono>
 
struct thread_pool
{
using void_fn = std::function<void ()>;
thread_pool( size_t maxThreads = 0 );
thread_pool( thread_pool const & ) = delete;
void operator =( thread_pool const & ) = delete;
~thread_pool();
uint64_t enqueue_task( void_fn &&task );
void_fn cancel( uint64_t queueId );
void wait_idle();
size_t max_threads();
size_t resize( size_t maxThreads );
bool clear_queue();
void_fn idle_callback( void_fn &&fn = {} );
std::pair<size_t, size_t> processing();
static typename std::chrono::milliseconds timeout(
std::chrono::milliseconds timeout );
private:
struct idle_node
{
idle_node *next;
bool notify;
};
using queue_item = std::pair<uint64_t, void_fn>;
using task_queue_t = std::deque<queue_item>;
bool m_quit;
size_t
m_maxThreads,
m_nThreadsExecuting;
uint64_t
m_lastIdleQueueId,
m_nextQueueId;
task_queue_t m_queue;
std::condition_variable m_idleCv;
std::shared_ptr<void_fn> m_idleCallback;
idle_node *m_idleList;
inline static struct global_t
{
std::mutex m_mtx;
std::chrono::milliseconds m_timeout = std::chrono::seconds( 1 );
std::condition_variable
m_cv,
m_quitCv;
bool m_quit;
size_t
m_nThreads,
m_nThreadsActive;
std::deque<thread_pool *> m_initiate;
void theThread();
global_t();
~global_t();
} global;
void processIdle( std::unique_lock<std::mutex> &lock );
std::unique_lock<std::mutex> waitIdle();
};
 
// translation unit
 
#include <cassert>
#include "thread_pool.h"
#include "invoke_on_destruct.h"
 
#if defined(_WIN32)
#pragma warning(disable: 26110) // Caller failing to hold lock
'lock' before calling function 'func'.
#pragma warning(disable: 26111) // Caller failing to release lock
'lock' before calling function 'func'.
#pragma warning(disable: 26115) // Failing to release lock 'lock'
in function 'func'.
#pragma warning(disable: 26117) // Releasing unheld lock 'lock' in
function 'func'.
#pragma warning(disable: 26800) // Use of a moved from object:
'object'.

No comments: