概要
通常线程池是同质的,每个线程都可以执行任意的task(每个线程中的task顺序执行),如下图所示:

但本文所介绍的线程和task之间有绑定关系,如A task只能跑在A thread上(因此称为异构线程池,每个线程的功能是有所区别的),如下图所示:

接口设计
TThreadPool接口设计
// 线程池
class TThreadPool
{
public:
    TThreadPool() {}
	TThreadPool(const TThreadPool&) = delete;
	TThreadPool& operator=(const TThreadPool& other) = delete;
    ~TThreadPool();
    bool add_thread(std::string name); // add one thread into pool
    bool delete_thread(std::string name); // remove one thread from pool
    TThread* get_thread_by_name(std::string threadname); // get the thread by name, don't delete return object by yourself
	bool append_task_by_thread(const std::string threadname, const std::function<void()>& task); // add task to pointed thread
private:
	std::mutex m_mutex;
	std::map<std::string, TThread*> m_threads;
}; 
TThreadPool类的主要功能是管理创建的线程(TThread,它是线程的具体实现),它提供了增加/删除线程的接口,同时给每个线程打上了标签(name)。
TThread接口设计
// 对std::thread的封装类
class TThread
{
public:
    TThread(std::string name);
    TThread(const TThread& other) = delete;
	TThread& operator=(const TThread& other) = delete;
    ~TThread();
    bool push_task(const std::function<void()>& task); // one add task
    std::thread::id get_thread_id(); // for log purpose
    void set_max_task_size(int s); // avoid thread too busy
    int get_task_size(); // get current task number
private:
	void work(); // real work thread
    void notify(); // notify work thread to quit
private:
    std::string s_name;
    std::atomic_bool b_running;
    std::thread* p_thread;
    std::mutex m_mutex;
	std::queue<std::function<void()> > m_tasks;
	std::condition_variable m_cond;
    std::atomic<int> i_maxTaskSize;
}; 
TThread类的主要功能是分配任务(push_task函数)和处理任务(work函数)。
代码实现
TThreadPool类
TThreadPool::~TThreadPool() {
    std::unique_lock<std::mutex> lk(m_mutex);
    for(auto iter=m_threads.begin(); iter!=m_threads.end(); iter++) {
        if(iter->second != nullptr) {
            delete iter->second;
        }
    }
    m_threads.clear();
}
bool TThreadPool::add_thread(std::string name) {
    std::unique_lock<std::mutex> lk(m_mutex);
    if(m_threads.count(name)) {
        return false;
    }
    auto tt = new TThread(name);
    if(tt == nullptr) {
        return false;
    }
    m_threads[name] = tt;
    return true;
}
bool TThreadPool::delete_thread(std::string name) {
    std::unique_lock<std::mutex> lk(m_mutex);
    if(m_threads.count(name) == 0) {
        return false;
    }
    delete m_threads[name];
    m_threads.erase(name);
    return true;
}
TThread* TThreadPool::get_thread_by_name(std::string threadname) {
    std::unique_lock<std::mutex> lk(m_mutex);
    if(m_threads.count(threadname) == 0) {
        return nullptr;
    }
    return m_threads[threadname];
}
bool TThreadPool::append_task_by_thread(const std::string threadname, const std::function<void()>& task)
{
    std::unique_lock<std::mutex> lk(m_mutex);
    if(m_threads.count(threadname) == 0) {
        return false;
    }
    auto tt = m_threads[threadname];
    return tt->push_task(task);
}
 
TThread类
const int MAX_TASK_SIZE = 100; // max task size in one thread
TThread::TThread(std::string name) : s_name(name)
                              , b_running(true)
                              , i_maxTaskSize(MAX_TASK_SIZE) {
    p_thread = new std::thread(&TThread::work, this);
}
TThread::~TThread() {
    notify(); // notify work thread quit
    if(p_thread->joinable()) {
        p_thread->join();
    }
    delete p_thread;
}
bool TThread::push_task(const std::function<void()>& task) {
	std::unique_lock<std::mutex> lk(m_mutex);
    if(!b_running) {
        return false;
    }
    if(m_tasks.size() > i_maxTaskSize.load()) {
        return false;
    }
    m_tasks.push(task);
    m_cond.notify_one();
    return true;
}
std::thread::id TThread::get_thread_id() {
    return p_thread->get_id();
}
void TThread::set_max_task_size(int s) {
    if(s <= 0) {return;}
    i_maxTaskSize.store(s);
}
void TThread::work() {
    std::cout << std::this_thread::get_id() << " begin work thread" << std::endl;
	while (b_running) { // quit even tasks remaining
		std::function<void()> task;
		{
			std::unique_lock<std::mutex> lk(m_mutex);
			if (!m_tasks.empty()) {
				task = m_tasks.front();
				m_tasks.pop();
			} else if (b_running && m_tasks.empty()) {
				m_cond.wait(lk);
            }
		}
		if (task)
			task(); // do the task
	}
    std::cout << std::this_thread::get_id() << " end work thread"  << std::endl;
}
void TThread::notify() {
    std::unique_lock<std::mutex> lk(m_mutex);
    b_running = false;
    m_cond.notify_one(); 
    // mutex will be released here, therefore another thread would lock it afterward
}
int TThread::get_task_size() {
    std::unique_lock<std::mutex> lk(m_mutex);
    return m_tasks.size();
} 
使用方式
有两种方式可以调用对应的线程
公共代码
void func1(int i) {
    std::cout << "into func1: " << i << std::endl;
    sleep(2); // simulate real work
}
    TThreadPool thread_pool;
    thread_pool.add_thread("vdr"); // 启动vdr线程
    thread_pool.add_thread("xgb"); // 启动xgb线程 
方式一、(先获取线程对象,然后对该线程对象添加任务)
auto tt = thread_pool.getThreadByName("vdr");
tt->push_task(std::bind(func1, 2));
tt->push_task(std::bind(func1, 5)); 
方式二、(直接通过线程池给对应线程添加任务)
thread_pool.append_task_by_thread("vdr", std::bind(func1, 2));
thread_pool.append_task_by_thread("vdr", std::bind(func1, 5)); 
注:
task是std::function<void()>类型,上面的demo是普通函数实现的,真实场景应该是类函数,实现如下:
class A {
public:
    void func(std::string str) {
        std::cout << "into A func: " << str << std::endl;
    }
};
    A a;
    thread_pool.append_task_by_thread("vdr", std::bind(&A::func, &a, "2"));
    thread_pool.append_task_by_thread("vdr", std::bind(&A::func, &a, "5")); 
                


















