当前位置:主页 > 软件编程 > C代码 >

C++实现线程池的简单方法示例

时间:2020-11-07 00:00:18 | 栏目:C代码 | 点击:

最近自己写了一个线程池。

总的来说,线程池就是有一个任务队列,一个线程队列,线程队列不断地去取任务队列中的任务来执行,当任务队列中为空时,线程阻塞等待新的任务添加过来。

我是用queue来存放任务,vector存放thread*,然后用condition_variable 来设置线程阻塞和唤醒。

下面直接上代码吧。

线程池类头文件Thread_Pool.h

/********************************************
   线程池头文件

  Author:十面埋伏但莫慌
  Time:2020/05/03

*********************************************/
#pragma once
#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_
#include<thread>
#include<queue>
#include<mutex>
#include<atomic>
#include<vector>
#include<condition_variable>

typedef std::function<void()> Func;//定义线程执行函数类型,方便后面编码使用。
//任务类
class Task {
public:
 Task() {}
 ~Task() {}
 int push(Func func);//添加任务;
 int getTaskNum();//获得当前队列中的任务数;
 Func pop();//取出待执行的任务;
public:
 std::mutex mx;//锁;
private:
 
 std::queue<Func> tasks;//任务队列
};
//线程池类
class Thread_Pool {
public:
 Thread_Pool() :IsStart(false) {}
 ~Thread_Pool();
 int addTasks(Func tasks);//添加任务;
 void start();//开启线程池;
 void stop();//关闭线程池;
 void run();//线程工作函数;
 int getTaskNum();//获得当前队列中的任务数;
private:
 static const int maxThreadNum = 3;//最大线程数为3;

 std::condition_variable cond;//条件量;
 std::vector<std::thread*> threads;//线程向量;
 std::atomic<bool> IsStart;//原子变量,判断线程池是否运行;
 Task tasks;//任务变量;
};
#endif

然后是线程池类成员函数定义文件Thread_Pool.cpp

/********************************************
   线程池CPP文件

  Author:十面埋伏但莫慌
  Time:2020/05/03

*********************************************/
#include"Thread_Pool.h"
#include<iostream>
int Task::push(Func func) {
 std::unique_lock<std::mutex> lock(mx);
 try {
  tasks.emplace(func);
 }
 catch (std::exception e)
 {
  throw e;
  return -1;
 }
 return 0;
}
int Task::getTaskNum()
{
 return tasks.size();
}
Func Task::pop() {
 std::unique_lock<std::mutex> lock(mx);
 Func temp;
 if (tasks.empty())
  return temp;
 else
 {
  temp = tasks.front();
  tasks.pop();
  return temp;
 }
}
int Thread_Pool::addTasks(Func func)
{
 
 int ret = tasks.push(func);
 cond.notify_one();
 return ret;
}
void Thread_Pool::start() {
 if (!IsStart) {
  IsStart = true;
  for (int i = 0; i < maxThreadNum; i++)
  {
   threads.emplace_back(new std::thread(std::bind(&Thread_Pool::run,this)));   
  }
  
 }
}
void Thread_Pool::run()
{
 while (IsStart)
 {
  Func f;
  if (tasks.getTaskNum() == 0 && IsStart)
  {
   std::unique_lock<std::mutex> lock(tasks.mx);
   cond.wait(lock);
  }
  if (tasks.getTaskNum() != 0 && IsStart)
  {
   f = tasks.pop();
   if(f)
    f();
  }

 }
}
int Thread_Pool::getTaskNum() {
 return tasks.getTaskNum();
}
void Thread_Pool::stop() {

  IsStart = false;
  cond.notify_all();
  for (auto T : threads) {
   std::cout << "线程 " << T->get_id() << " 已停止。" << std::endl;
   T->join();
   if (T != nullptr)
   {
    delete T;
    T = nullptr;
   }
  }
 std::cout << "所有线程已停止。" << std::endl;
}
Thread_Pool::~Thread_Pool() {
 if (IsStart)
 {
  stop();
 }
}

最后是测试用的main.cpp

#include<iostream>
#include"Thread_Pool.h"
using namespace std;
void string_out_one() {
 cout << "One!" << endl;
}
void string_out_two() {
 cout << "Two!" << endl;
}
void string_out_three() {
 cout << "Three!" << endl;
}
int main() {
 {
  Thread_Pool Pool;
  try {
   Pool.start();
  }
  catch (std::exception e)
  {
   throw e;
   cout << "线程池创建失败。" << endl;
  }
  for (int i = 0; i < 50000 ;)
  {  
   if (Pool.getTaskNum() < 1000) {
    Pool.addTasks(string_out_one);
    Pool.addTasks(string_out_two);
    Pool.addTasks(string_out_three);
    std::cout << i++ << std::endl;
   }
  }
  getchar();
 }
 getchar();
 return 0;
}

执行的效果如下:

线程唤醒和阻塞的逻辑就是在线程工作函数run函数中,判断队列是否为空,若为空则设置锁并调用condition变量的wait函数,释放这个线程中的锁并阻塞线程,等待任务队列中新的任务添加进来后,

condition变量通过notify_one()随机唤醒一个在wait的线程,取出队列中的任务执行。

写这个线程池的过程中碰到的最主要需要注意的就是锁的使用,在对队列的写和释放时要注意加锁,在需要阻塞线程时,要注意通过{}设置锁的范围。

IsStart是原子的,所以在写这个变量的时候没有另外加锁。

目前我觉得这个线程池的缺陷就是可执行函数的类型被写死了,有尝试对Task类使用模板类,但是在Thread_Pool中还是要指明Task模板类的类型参数,要是有大神指点下就好了- -。

就先记录这么多,感觉这个线程池的还是有很多可以改进的地方的,也欢迎大家指出不足。

您可能感兴趣的文章:

相关文章