一个简单的linux线程池

更新日期:2021-10-15

来源:纯净之家


系统大全为您提供
 
线程池:简单地说,线程池 就是预先创建好一批线程,方便、快速地处理收到的业务。比起传统的到来一个任务,即时创建一个线程来处理,节省了线程的创建和回收的开销,响应更快,效率更高。
  在linux中,使用的是posix线程库,首先介绍几个常用的函数:
  1 线程的创建和取消函数
  pthread_create
  创建线程
  pthread_join
  合并线程
  pthread_cancel
  取消线程
  2 线程同步函数
  pthread_mutex_lock
  pthread_mutex_unlock
  pthread_cond_signal
  pthread_cond_wait
  关于函数的详细说明,参考man手册
  线程池的实现:
  线程池的实现主要分为三部分,线程的创建、添加任务到线程池中、工作线程从任务队列中取出任务进行处理。
  主要有两个类来实现,CTask,CThreadPool
  /**
  执行任务的类,设置任务数据并执行
  **/
  C代码
  class CTask
  {
  protected:
  string m_strTaskName;  //任务的名称
  void* m_ptrData;       //要执行的任务的具体数据
  public:
  CTask(){}
  CTask(string taskName)
  {
  this->m_strTaskName = taskName;
  m_ptrData = NULL;
  }
  virtual int Run()= 0;
  void SetData(void* data);    //设置任务数据
  };
  任务类是个虚类,所有的任务要从CTask类中继承 ,实现run接口,run接口中需要实现的就是具体解析任务的逻辑。m_ptrData是指向任务数据的指针,可以是简单数据类型,也可以是自定义的复杂数据类型。
  线程池类
  /**
  线程池
  **/
  Java代码
  class CThreadPool
  {
  private:
  vector<CTask*> m_vecTaskList;         //任务列表
  int m_iThreadNum;                            //线程池中启动的线程数
  static vector<pthread_t> m_vecIdleThread;   //当前空闲的线程集合
  static vector<pthread_t> m_vecBusyThread;   //当前正在执行的线程集合
  static pthread_mutex_t m_pthreadMutex;    //线程同步锁
  static pthread_cond_t m_pthreadCond;    //线程同步的条件变量
  protected:
  static void* ThreadFunc(void * threadData); //新线程的线程函数
  static int MoveToIdle(pthread_t tid);   //线程执行结束后,把自己放入到空闲线程中
  static int MoveToBusy(pthread_t tid);   //移入到忙碌线程中去
  int Create();          //创建所有的线程
  public:
  CThreadPool(int threadNum);
  int AddTask(CTask *task);      //把任务添加到线程池中
  int StopAll();
  };
  当线程池对象创建后,启动一批线程,并把所有的线程放入空闲列表中,当有任务到达时,某一个线程取出任务并进行处理。
  线程之间的同步用线程锁和条件变量。
  这个类的对外接口有两个:
  AddTask函数把任务添加到线程池的任务列表中,并通知线程进行处理。当任务到到时,把任务放入m_vecTaskList任务列表中,并用pthread_cond_signal唤醒一个线程进行处理。
  StopAll函数停止所有的线程
  Cpp代码
  ************************************************
  代码:
  ××××××××××××××××××××CThread.h
  #ifndef __CTHREAD
  #define __CTHREAD
  #include <vector>
  #include <string>
  #include <pthread.h>
  using namespace std;
  /**
  执行任务的类,设置任务数据并执行
  **/
  class CTask
  {
  protected:
  string m_strTaskName;  //任务的名称
  void* m_ptrData;       //要执行的任务的具体数据
  public:
  CTask(){}
  CTask(string taskName)
  {
  this->m_strTaskName = taskName;
  m_ptrData = NULL;
  }
  virtual int Run()= 0;
  void SetData(void* data);    //设置任务数据
  };
  /**
  线程池
  **/
  class CThreadPool
  {
  private:
  vector<CTask*> m_vecTaskList;         //任务列表
  int m_iThreadNum;                            //线程池中启动的线程数
  static vector<pthread_t> m_vecIdleThread;   //当前空闲的线程集合
  static vector<pthread_t> m_vecBusyThread;   //当前正在执行的线程集合
  static pthread_mutex_t m_pthreadMutex;    //线程同步锁
  static pthread_cond_t m_pthreadCond;    //线程同步的条件变量
  protected:
  static void* ThreadFunc(void * threadData); //新线程的线程函数
  static int MoveToIdle(pthread_t tid);   //线程执行结束后,把自己放入到空闲线程中
  static int MoveToBusy(pthread_t tid);   //移入到忙碌线程中去
  int Create();          //创建所有的线程
  public:
  CThreadPool(int threadNum);
  int AddTask(CTask *task);      //把任务添加到线程池中 
  int StopAll();
  };
  #endif
 
类的实现为:
  ××××××××××××××××××××CThread.cpp
  #include "CThread.h"
  #include <string>
  #include <iostream>
  using namespace std;
  void CTask::SetData(void * data)
  {
  m_ptrData = data;
  }
  vector<pthread_t> CThreadPool::m_vecBusyThread;
  vector<pthread_t> CThreadPool::m_vecIdleThread;
  pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER;
  pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER;
  CThreadPool::CThreadPool(int threadNum)
  {
  this->m_iThreadNum = threadNum;
  Create();
  }
  int CThreadPool::MoveToIdle(pthread_t tid)
  {
  vector<pthread_t>::iterator busyIter = m_vecBusyThread.begin();
  while(busyIter != m_vecBusyThread.end())
  {
  if(tid == *busyIter)
  {
  break;
  }
  busyIter++;
  }
  m_vecBusyThread.erase(busyIter);
  m_vecIdleThread.push_back(tid);
  return 0;
  }
  int CThreadPool::MoveToBusy(pthread_t tid)
  {
  vector<pthread_t>::iterator idleIter = m_vecIdleThread.begin();
  while(idleIter != m_vecIdleThread.end())
  {
  if(tid == *idleIter)
  {
  break;
  }
  idleIter++;
  }
  m_vecIdleThread.erase(idleIter);
  m_vecBusyThread.push_back(tid);
  return 0;
  }
  void* CThreadPool::ThreadFunc(void * threadData)
  {
  pthread_t tid = pthread_self();
  while(1)
  {
  pthread_mutex_lock(&m_pthreadMutex);
  pthread_cond_wait(&m_pthreadCond,&m_pthreadMutex);
  cout 《 "tid:" 《 tid 《 " run" 《 endl;
  //get task
  vector<CTask*>* taskList = (vector<CTask*>*)threadData;
  vector<CTask*>::iterator iter = taskList->begin();
  while(iter != taskList->end())
  {
  MoveToBusy(tid);
  break;
  }
  CTask* task = *iter;
  taskList->erase(iter);
  pthread_mutex_unlock(&m_pthreadMutex);
  cout 《 "idel thread number:" 《 CThreadPool::m_vecIdleThread.size() 《 endl;
  cout 《 "busy thread number:" 《 CThreadPool::m_vecBusyThread.size() 《 endl;
  //cout 《 "task to be run:" 《 taskList->size() 《 endl;
  task->Run();
  //cout 《 "CThread::thread work" 《 endl;
  cout 《 "tid:" 《 tid 《 " idle" 《 endl;
  }
  return (void*)0;
  }
  int CThreadPool::AddTask(CTask *task)
  {
  this->m_vecTaskList.push_back(task);
  pthread_cond_signal(&m_pthreadCond);
  return 0;
  }
  int CThreadPool::Create()
  {
  for(int i = 0; i < m_iThreadNum;i++)
  {
  pthread_t tid = 0;
  pthread_create(&tid,NULL,ThreadFunc,&m_vecTaskList);
  m_vecIdleThread.push_back(tid);
  }
  return 0;
  }
  int CThreadPool::StopAll()
  {
  vector<pthread_t>::iterator iter = m_vecIdleThread.begin();
  while(iter != m_vecIdleThread.end())
  {
  pthread_cancel(*iter);
  pthread_join(*iter,NULL);
  iter++;
  }
  iter = m_vecBusyThread.begin();
  while(iter != m_vecBusyThread.end())
  {
  pthread_cancel(*iter);
  pthread_join(*iter,NULL);
  iter++;
  }
  return 0;
  }
 
简单示例:
  ××××××××test.cpp
  #include "CThread.h"
  #include <iostream>
  using namespace std;
  class CWorkTask: public CTask
  {
  public:
  CWorkTask()
  {}
  int Run()
  {
  cout 《 (char*)this->m_ptrData 《 endl;
  sleep(10);
  return 0;
  }
  };
  int main()
  {
  CWorkTask taskObj;
  char szTmp[] = "this is the first thread running,haha success";
  taskObj.SetData((void*)szTmp);
  CThreadPool threadPool(10);
  for(int i = 0;i < 11;i++)
  {
  threadPool.AddTask(&taskObj);
  }
  while(1)
  {
  sleep(120);
  }
  return 0;
  }
  
  以上就是系统大全给大家介绍的如何使的方法都有一定的了解了吧,好了,如果大家还想了解更多的资讯,那就赶紧点击系统大全官网吧。 
 
本文来自系统大全http://www.win7cn.com/如需转载请注明!推荐:win7纯净版