189 8069 5689

C++11线程池及其三种优化-创新互联

文章目录
    • 1. C++11 简单线程池
    • 2. 优化1 - 支持任意类型任务
    • 3. 优化2 - 支持可变参数
    • 4. 优化3 - 通过future获取任务函数的返回值
    • 5. 总结

 在看这篇文章之前,请先确保大致了解过线程池的基本实现及其原理。如果不太了解,可以先去阅读一下这篇文章: pthread线程池的实现

专注于为中小企业提供成都网站设计、成都网站建设服务,电脑端+手机端+微信端的三站合一,更高效的管理,为中小企业密山免费做网站提供优质的服务。我们立足成都,凝聚了一批互联网行业人才,有力地推动了上千家企业的稳健成长,帮助中小企业通过网站建设实现规模扩充和转变。1. C++11 简单线程池

 下面的线程池是基于C++11的线程池。编写的思路跟前文的pthread实现的线程池基本一致。

#include#include#include#include#include#include#include   

templateclass ThreadPool
{public:
	ThreadPool(size_t thread_nums = std::thread::hardware_concurrency())
		:_thread_nums(thread_nums)
		, _stop(false)
	{for (size_t i = 0; i< _thread_nums; ++i)
		{	_vt.emplace_back(std::thread(&ThreadPool::LoopWork, this));//将this指针也传过去
		}
	}

	//禁用拷贝构造和operator=  
	ThreadPool(const ThreadPool &) = delete;
	ThreadPool& operator=(const ThreadPool &) = delete;

private:
	//线程的执行函数  
	void LoopWork()
	{std::unique_lockul(_mtx);
		for (;;)
		{	while (_taskQueue.empty() && !_stop)
			{		_isEmpty.wait(ul);
			}

			//线程从wait中出来有2种情况: 1.有任务了 2.线程池stop为true
			if (_taskQueue.empty()) {		ul.unlock();	//退出前要先解锁
				break;
			}
			else {		T* task = std::move(_taskQueue.front());
				_taskQueue.pop();
				ul.unlock();
				(*task)();	//任务类需要重载operator()()
				ul.lock();
			}
		}
	}

public:
	void PushTask(T& task)
	{//通过{}控制lock_guard的作用域和生命周期
		{	std::lock_guardlg(_mtx);
			_taskQueue.push(&task); //任务队列是临界资源(其他地方会修改)  
		}
		_isEmpty.notify_one(); //条件变量的通知并不会因为多线程而影响结果(因此可以不加锁)
	}


	~ThreadPool()
	{_stop = true;
		_isEmpty.notify_all();
		for (size_t i = 0; i< _thread_nums; ++i)
		{	if (_vt[i].joinable()) {		_vt[i].join();
			}
		}
	}

private:
	size_t _thread_nums;
	std::vector_vt;
	std::queue_taskQueue;
	std::mutex _mtx;
	std::condition_variable _isEmpty;
	std::atomic_stop;
};

struct Task {Task(int x, int y)
		:_x(x),
		 _y(y)
	{}

	void operator()() {std::cout<< _x<< " + "<< _y<< " = "<< _x + _y<< std::endl;
	}

	int _x;
	int _y;
};

int main()
{std::shared_ptr>tp(new ThreadPool());
	Task t(1, 2);
	tp->PushTask(t);
}

输出结果:

1 + 2 = 3

2. 优化1 - 支持任意类型任务

 在前文的pthread线程池以及根据pthread改写的C++11线程池都是基于模板实现的。因为线程池需要能够接收不同类型的任务。但是将整个ThreadPool类设为模板其实不是最优解,因为当ThreadPool需要处理其它类型的任务时,还需要再实例化出一个新的ThreadPool类。它并没有实现一个线程池实例接收多种类型任务的功能。它实现的是多个线程池实例接收多种类型任务,每个线程池本质是只处理一种任务。

 为了解决上述问题,我们不应该将整个ThreadPool都设为模板类,我们的思路是让任务队列能够存放不同类型的任务。这里使用到了C++11中的function包装器,让任务队列里存放function类型的任务。而在PushTask()方法里,我们需要让该函数能够接收任意类型的任务,因此需要单独将PushTask()方法设为模板函数。

#include#include#include#include#include#include#include   

class ThreadPool
{public:
	ThreadPool(size_t thread_nums = std::thread::hardware_concurrency())
		:_thread_nums(thread_nums)
		, _stop(false)
	{for (size_t i = 0; i< _thread_nums; ++i)
		{	_vt.emplace_back(std::thread(&ThreadPool::LoopWork, this));//将this指针也传过去
		}
	}

	//禁用拷贝构造和operator=  
	ThreadPool(const ThreadPool &) = delete;
	ThreadPool& operator=(const ThreadPool &) = delete;

private:
	//线程的执行函数  
	void LoopWork()
	{std::unique_lockul(_mtx);
		for (;;)
		{	while (_taskQueue.empty() && !_stop)
			{		_isEmpty.wait(ul);
			}

			//线程从wait中出来有2种情况: 1.有任务了 2.线程池stop为true
			if (_taskQueue.empty()) {		ul.unlock();	//退出前要先解锁
				break;
			}
			else {		auto task = std::move(_taskQueue.front());
				_taskQueue.pop();
				ul.unlock();
				task();
				ul.lock();
			}
		}
	}

public:
	templatevoid PushTask(F&& task)
	{{	std::lock_guardlg(_mtx);
			_taskQueue.push(std::forward(task)); //任务队列是临界资源(其他地方会修改)  
		}
		_isEmpty.notify_one(); //条件变量的通知并不会因为多线程而影响结果(因此可以不加锁)
	}


	~ThreadPool()
	{_stop = true;
		_isEmpty.notify_all();
		for (size_t i = 0; i< _thread_nums; ++i)
		{	if (_vt[i].joinable()) {		_vt[i].join();
			}
		}
	}

private:
	size_t _thread_nums;
	std::vector_vt;
	std::queue>_taskQueue;//每个任务必须保证是: void返回值、无参
	std::mutex _mtx;
	std::condition_variable _isEmpty;
	std::atomic_stop;
};

• 优化1 - 测试

std::mutex gb_mtx;
void Add(int x, int y)
{std::lock_guardlg(gb_mtx);	//为了保证输出结果不会打印错乱
	std::cout<< x<< " + "<< y<< " = "<< x + y<< std::endl;
}

int main()
{std::shared_ptrtp(new ThreadPool());
	for (int i = 0; i< 10; ++i)
	{auto f = bind(Add, i, i + 1);	//我们需要手动绑定一下函数参数
		tp->PushTask(f);
	}
	return 0;
}

输出结果:

0 + 1 = 1
1 + 2 = 3
2 + 3 = 5
3 + 4 = 7
4 + 5 = 9
5 + 6 = 11
6 + 7 = 13
7 + 8 = 15
8 + 9 = 17
9 + 10 = 19

3. 优化2 - 支持可变参数

 前一个版本的线程池是支持任意类型的任务的,但是我们在使用线程池的时候必须要手动bind一个函数对象,然后再传过去(这是因为任务队列要求任务必须是void返回值且无参数)。为了优化这个问题,我们可以使用可变参数模板+bind解决。下面给出PushTask()方法的优化:(除了PushTask, 其它地方没有任何修改)

templatevoid PushTask(F&& task, Args&&... args)
	{//将可变参数包装起来, 包装后func的类型满足了"void返回值+无参"
		auto func = std::bind(std::forward(task), std::forward(args)...);
		{	std::lock_guardlg(_mtx);
			_taskQueue.push(std::forward(func)); //传入我们包装好的函数对象
		}
		_isEmpty.notify_one(); //条件变量的通知并不会因为多线程而影响结果(因此可以不加锁)
	}

• 优化2 - 测试

std::mutex gb_mtx;
void Add(int x, int y)
{std::lock_guardlg(gb_mtx);	//为了保证输出结果不会打印错乱
	std::cout<< x<< " + "<< y<< " = "<< x + y<< std::endl;
}

int main()
{std::shared_ptrtp(new ThreadPool());
	for (int i = 0; i< 10; ++i)
	{//auto f = bind(Add, i, i + 1);
		//tp->PushTask(f);
		tp->PushTask(Add, i, i + 1);	//我们不需要手动绑定了, 直接传参即可
	}
	return 0;
}

 输出结果与测试1的相同。

4. 优化3 - 通过future获取任务函数的返回值

 在优化2的基础上,为了能够接收任务函数的返回值,并且还不能让线程阻塞。这里需要使用线程异步。我们在PushTask()中需要返回一个future类型的对象。下面是对PushTask()方法的修改。(除了PushTask, 其它地方没有任何修改)

注意: 记得包含一下头文件。

templateauto PushTask(F&& task, Args&&... args) ->std::future{//将可变参数包装起来
		auto func = std::bind(std::forward(task), std::forward(args)...);
		auto task_ptr = std::make_shared>(func); //(1)
		std::functionwrapper_func = [task_ptr] {	(*task_ptr)();
		};//(2)

		{	std::lock_guardlg(_mtx);
			_taskQueue.push(wrapper_func); 
		}

		_isEmpty.notify_one(); //条件变量的通知并不会因为多线程而影响结果(因此可以不加锁)
		return task_ptr->get_future();//(3)
	}

 在优化2的基础上,我们从(1)开始看。我们使用make_shared构造了一个share_ptr对象, 方便我们管理。这个智能指针的类型是package_task。详细介绍看这里: C++11 线程异步。这个package_task中包装的函数对象的类型为decltype(task(args...))()。这个类型的含义为: 该函数对象的返回值的类型为task(args...),函数参数为空。

 (2)是对智能指针再次进行了一层封装,目的为了能够将其作为参数传给任务队列。实际上这里可以省略不写的,然后更改_taskQueue.push(wrapper_func);

_taskQueue.push([task_ptr]{(*task_ptr)();
	});

 (3)返回智能指针管理的package_task的future对象。

• 优化3 - 测试

int Add(int x, int y)
{return x + y;
}

int main()
{std::shared_ptrtp(new ThreadPool());
	std::vector>res;	//future保存的就是函数的返回值
	for (int i = 0; i< 10; ++i)
	{res.push_back(tp->PushTask(Add, i, i + 1));
	}

	for (auto& e : res)
	{std::cout<< e.get()<< std::endl;
	}
	return 0;
}

输出结果:

1
3
5
7
9
11
13
15
17
19

5. 总结

 我们实现的pthread的线程池是基于模板的,因此当有不同类型的任务想要添加到线程池处理时,我们就必须额外创建一个对应类型的线程池对象,这个过程往往不是我们想看到的。为了解决这种问题,我们给出了三种递进式优化方案,每种都是基于前面一种的基础上作的进一步优化。

 优化1: 支持任意类型的任务函数。它的本质是让任务队列能够接收任意类型的任务函数,我们这里使用了function作为任务队列的类型,也就是说我们在PushTask()时需要事前包装好任务函数,保证其类型为void()(返回值为void, 参数为空)

 优化2: 支持可变参数。这是通过可变参数模板+bind实现的。我们允许使用者直接传任务函数以及它的任意个参数。我们需要在push任务前包装好任务函数,保证任务函数为void()类型。而如何包装? 这里就需要使用bind进行包装了。
auto func = std::bind(std::forward(task), std::forward(args)...);

 优化3: 通过future获取任务函数的返回值。这是通过线程异步实现的。获取子线程执行完毕的函数的返回值的方法就是使用异步。因此我们将func任务函数 包装到了package_task任务包当中,package_task中保存了future对象。这里我们使用了智能指针去管理package_task。由于我们最终的目的是需要将一个类型为void()的任务添加到任务队列。因此这里我们使用了lambda表达式对智能指针再次进行了一层封装,里面包装了智能指针去调用任务函数的过程。lambada表达式的类型就是void(),因此我们可以将其添加到任务队列当中。后续取出任务后,直接像调用函数一样执行任务即可。(因为任务队列当中存的都是一个个的function函数对象, 执行函数对象的方法就是使用operator())

杂谈:

 个人觉得能够掌握前两种优化就够了。第三种相对比较难懂一些,并且没有太大的必要。我们之所以会实现第三种是因为想要获取任务函数的返回值! 因为我们在优化2的状态下也能够获取到任务的返回值。我们可以传入一个输出型参数即可。示例如下:

void Add(int x, int y, int& res)
{res = x + y;
}

int main()
{std::shared_ptr>tp(new ThreadPool());
	int res;
	tp->PushTask(Add, 1, 2, std::ref(res));	//注意, 这里必须要使用``ref()``引用传参!!!
	std::cout<< res<< std::endl;
	return 0;
}

输出结果: 3

注: 还有很多人会将taskQueue任务队列封装成一个类*(SafeQueue)*,然后封装任务队列的各种接口,将各种加锁、解锁操作都封装到了该类里面。外界在使用该类时,不需要再额外考虑锁的问题了。

参考:

Github: mtrebi/thread-pool: Thread pool implementation using c++11 threads

https://zhuanlan.zhihu.com/p/367309864

你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧


网站名称:C++11线程池及其三种优化-创新互联
本文URL:http://cdxtjz.com/article/dihijc.html

其他资讯