スレッドを事前に生成しプールしておくと、実際に使用するときにスレッド生成のオーバーヘッドを避けることができる。このようなパターンをWoker threadパターンと言うらしいので、実際に実装してみた。
仕様はこんな感じ。
- スレッドプールを作成するときにプールサイズを指定できる。
- スレッドプールを作成するときにプールサイズを指定しない場合は、デフォルト値のスレッドが用意される。
- リクエストメソッドでスレッドにタスクを渡す
- クローズメソッドで残っているタスクが完了次第、すべてのスレッドを削除する。
- スレッド追加メソッドで引数個のスレッドをプールに追加する。
- スレッド削除メソッドで引数個のスレッドをプールから削除する。引数が現在のスレッド数を超えている場合はfalseを返す。
- スレッドを削除するときはアイドル状態のスレッドから削除する。
実装はboost C++ Librariesのthreadを使用した。
クラスの定義は次の通り。
- class DThreadPool : boost::noncopyable {
- private:
- std::queue<boost::function0<void> > queue_; //タスクを入れるキュー
- std::map<boost::thread::id, boost::thread *> threadMap_; //スレッドIDとスレッドのマップ
- std::map<boost::thread::id, bool> idleMap_; //スレッドIDをスレッド状態のマップ
- boost::thread_group thGrp_;
- boost::mutex mutex_;
- boost::condition_variable_any condition_;
- bool isAcceptable_; //タスク受付可能かどうかのフラグ
- void run(); //スレッドで実行するタスク待ち関数
- void setIdleFlag(boost::thread::id threadID, bool idleFlag);
- public:
- DThreadPool();
- DThreadPool(int size);
- ~DThreadPool();
- void request(boost::function0<void> func); //タスクをキューに入れる関数
- void close(); //クローズ関数
- int size();
- void addThread(int n); //スレッド追加関数
- bool removeThread(int n); //スレッド削除関数
- };
スレッドの生成は次のようになっている。
- DThreadPool::DThreadPool(int size)
- {
- this->isAcceptable_ = true;
- for (int i = 0; i < size; i++){
- boost::thread* th = thGrp_.create_thread(boost::bind(&DThreadPool::run, this));
- this->threadMap_[th->get_id()] = th;
- }
- }
まず、3行目でタスクを受付可能とする。4〜6行目でsize個のスレッドにrun関数を渡して生成する。それぞれのスレッドをスレッドIDをキー、スレッドへのポインタを値としてマップに入れる。
run関数は次のようになっている。
- void DThreadPool::run()
- {
- while (isAcceptable_){
- boost::mutex::scoped_lock lock(this->mutex_);
- setIdleFlag(boost::this_thread::get_id(), true);
- while (this->queue_.empty()){
- this->condition_.wait(lock);
- }
- setIdleFlag(boost::this_thread::get_id(), false);
- boost::function0<void> func = this->queue_.front();
- this->queue_.pop();
- func();
- }
- while (!this->queue_.empty()){
- boost::function0<void> func = this->queue_.front();
- this->queue_.pop();
- func();
- }
- }
3〜15行目は受付可能な状態での処理。まず、ロックをかけキューにタスクが入るの待機する状態になる。このとき自分のスレッドにアイドルフラグをセットする。キューに何かが入ってくると、アイドルフラグを外し、キューから取り出して実行する。16〜20行目は受付不可の状態での処理。キューが空になるまで、キューからタスクを取り出し実行する。キューが空になった時点で、関数が終了する。
タスクをキューに入れる処理は次のようになっている。
- void DThreadPool::request(boost::function0<void> func)
- {
- boost::mutex::scoped_lock lock(this->mutex_);
- this->queue_.push(func);
- this->condition_.notify_all();
- }
ただロックかけて、キューにプッシュして、ロックを外しているだけ。
クローズ処理は次のようになっている。
- void DThreadPool::close()
- {
- this->isAcceptable_ = false;
- for(std::map<boost::thread::id, boost::thread*>::iterator iter = this->threadMap_.begin();
- iter != this->threadMap_.end();
- ++iter){
- (iter->second)->interrupt();
- (iter->second)->join();
- this->thGrp_.remove_thread(iter->second);
- }
- }
まず、3行目で受付不可状態にする。4〜10行目ですべてのスレッドに割り込みをかけて、終了を待つ。終了したスレッドはプールから削除する。デストラクタ内でこの関数を呼ぶことで、安全に終了することができる。
スレッドの追加は次の通り。
- void DThreadPool::addThread(int n)
- {
- if(!this->isAcceptable_){
- this->isAcceptable_ = true;
- }
- for (int i = 0; i < n; i++){
- boost::thread* th = thGrp_.create_thread(boost::bind(&DThreadPool::run, this));
- this->threadMap_[th->get_id()] = th;
- }
- }
要はコンストラクタ内でやっていることと同じことをしている。違う点は受付状態を確認してから受付可能にするところ。
スレッドの削除は次の通り。
- bool DThreadPool::removeThread(int n)
- {
- if(n > this->thGrp_.size()){
- return false;
- }
- int count = 0;
- while(count < n){
- for(std::map<boost::thread::id, boost::thread*>::iterator iter = this->threadMap_.begin();
- iter != this->threadMap_.end();
- ++iter){
- if(count < n){
- if(this->idleMap_[iter->first]){
- boost::thread* th = this->threadMap_[iter->first];
- th->interrupt();
- th->join();
- this->thGrp_.remove_thread(th);
- this->idleMap_.erase(iter->first);
- count++;
- }
- }
- }
- }
- if(this->thGrp_.size() == 0){
- this->isAcceptable_ = false;
- }
- return true;
- }
まず、3〜5行目で削除するスレッド数が現在のスレッド数より多いかをチェックする。多い場合はfalseを返す。次に7〜20行目で削除する個数分だけ、アイドルフラグが立っているスレッドを探して、割り込み処理→終了待ち→削除としていく。このときスレッド状態マップから該当するスレッドIDを削除する。23〜25行目でサイズが0になった場合は受付不可にする。
ソースはこちら→ threadpool.tar.gzをダウンロード
参考サイト
コメント