Declrations
class condition;
Overview
condition は,何らかの条件を基準にして,スレッド間で待受/通知制御を行うためのクラスです. 何らかの条件を満たせずに(読み込むためのデータが他のスレッドによってセットされていないなど), それ以上処理を続けることができなくなった場合,wait() メソッドでスレッドの実行を一時休止して, 他のスレッドからの通知を待ち受けます.他のスレッドは自分の処理を続け, 休止したスレッドが処理を再開できる状態になれば,notify_one(),または notify_all() メソッドを用いてその旨を通知します.
Example
#include <iostream> #include <sstream> #include <string> #include <list> #include "clx/thread.h" /* ------------------------------------------------------------------------- */ // trivial_queue /* ------------------------------------------------------------------------- */ template <class Type> class trivial_queue { public: typedef Type value_type; trivial_queue() : list_(), mutex_(), not_empty_() {} virtual ~trivial_queue() {} void enqueue(const value_type& x) { clx::mutex::scoped_lock lock(mutex_); std::cout << "PSH: " << x << " (rest " << list_.size(); list_.push_back(x); std::cout << " -> " << list_.size() << ")" << std::endl; not_empty_.notify_one(); } value_type dequeue() { clx::mutex::scoped_lock lock(mutex_); while (list_.empty()) { std::cout << "empty buffer" << std::endl; not_empty_.wait(lock); } value_type tmp = list_.front(); std::cout << "POP: " << tmp << " (rest " << list_.size(); list_.pop_front(); std::cout << " -> " << list_.size() << ")" << std::endl; return tmp; } void reset() { clx::mutex::scoped_lock lock(mutex_); list_.clear(); } private: std::list<value_type> list_; clx::mutex mutex_; clx::condition not_empty_; }; trivial_queue<std::string> data_; clx::once_flag once_ = CLX_ONCE_INIT; /* ------------------------------------------------------------------------- */ // init_data /* ------------------------------------------------------------------------- */ void init_data() { std::cout << "reset data queue" << std::endl; data_.reset(); } /* ------------------------------------------------------------------------- */ // send_something /* ------------------------------------------------------------------------- */ void send_something() { for (int i = 0; i < 10; i++) { clx::call_once(once_, init_data); // call_once test std::stringstream ss; ss << "element[" << i << "]"; data_.enqueue(ss.str()); } } /* ------------------------------------------------------------------------- */ // recv_something /* ------------------------------------------------------------------------- */ void recv_something() { std::string s; for (int i = 0; i < 10; i++) { clx::call_once(once_, init_data); // call_once test s = data_.dequeue(); } } /* ------------------------------------------------------------------------- */ // main /* ------------------------------------------------------------------------- */ int main(int argc, char* argv[]) { clx::thread enq_th[2]; clx::thread deq_th[2]; for (size_t i = 0; i < 2; i++) enq_th[i].start(send_something); for (size_t i = 0; i < 2; i++) deq_th[i].start(recv_something); for (size_t i = 0; i < 2; i++) enq_th[i].join(); for (size_t i = 0; i < 2; i++) deq_th[i].join(); std::cout << "end of main thread" << std::endl; return 0; }
Result reset data queue empty buffer empty buffer PSH: element[0] (rest 0) PSH: element[0] (rest 1) PSH: element[1] (rest 2) PSH: element[1] (rest 3) POP: element[0] (rest 4) PSH: element[2] (rest 3) POP: element[0] (rest 4) ・・・(以下略)・・・
Related Types
typedef ... handle_pointer;
Construction and Member Functions
condition(); virtual ~condition(); template <class LockT> void wait(LockT& lock); template <class LockT> bool timed_wait(LockT& lock, double sec); void notify_one(); void notify_all(); handle_pointer native_handle();
wait(),または timed_wait() メソッドには,mutex::scoped_lock,または recursive_mutex::scoped_lock を指定します.wait() メソッドは, 他のスレッドから通知があるまで待機し続けます.これに対して timed_wait() メソッドは, 最大でsec 秒まで待機します.timed_wait() メソッドは,タイムアウトした場合, false が返ります.