在 C++11 之前 ,C++ 标准并没有提供统一的并发编程标准,也没有提供语言级别的支持。这导致我们在编写可移植的多线程程序时很不方便,往往需要面向不同的平台进行不同的实现,或者引入一些第三方平台,如Boost,pthread_win32 等。 从C++11开始 ,对并发编程进行了语言级别的支持,使用使用C++进行并发编程方便了很多。这里介绍C++11并发编程的相关特性。
1 线程
1.1 线程的创建
std::thread 的构造函数如下:
1 2 template < class Function, class ... Args > explicit thread ( Function&& f, Args&&... args ) ;
我们只需要提供线程函数或函数对象,即可以创建线程,并可以同时指定线程函数的参数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 #include <iostream> #include <thread> using namespace std;void func (const string& str) { cout<<str<<endl; } class FuncCls {public : void operator () () { cout<<"Funcls called" <<endl; } }; int main (void ) { thread t1 (func,"Hello World" ) ; FuncCls fcls; thread t2 (fcls) ; if (t1. joinable ()){ t1. join (); } t2. join (); return 0 ; }
join
函数将会阻塞线程,直到线程函数执行完毕,主线程才会接着执行。如果线程函数有返回值,返回值将被忽略。 在使用线程对象的过程中,我们需要注意线程对象的生命周期。如果线程对象先于线程函数结束,那么将会出现不可预料的错误。可以通过线程阻塞的方式来等待线程函数执行完(join),或让线程在后台执行。 如果不希望线程被阻塞,可以调用线程的 detach() 函数,将线程与线程对象分离。但需要注意的是,detach 之后的线程无法再使用join来进行阻塞了,即detach之后的线程,我们无法控制了。当我们不确定一个线程是否可以join时,可以先使用 thead::joinable() 来进行判断。
另外,我们还可以通过 std::bind, lambda
来创建线程(其实就是使用函数对象创建线程)。
线程不可以被复制,但是可以被移动:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 #include <iostream> #include <thread> using namespace std;void func (const string& str) { cout<<str<<endl; } int main (void ) { thread t1 (func,"Hello World" ) ; thread t2 (move(t1)) ; t2. join (); return 0 ; }
线程被移动之后,原来的线程对象将不再代表任何线程。
1.1.1 thread 的一般性用法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 void func (int nSec) { std::this_thread::sleep_for (std::chrono::seconds (nSec)); cout<<"time out" <<endl; } int main (void ) { thread t1 (func,3 ) ; t1. join (); cout<<std::thread::hardware_concurrency ()<<endl; return 0 ;}
2 互斥量和锁
互斥时用来保护被多个线程同时访问的共享数据 c++11提供了4种语义的互斥量:
std::mutex 独占的互斥量
std::timed_mutex 带超时的独占互斥量,不可递归使用
std::recursive_mutex 递归互斥量,不带超时功能
std::recursive_timed_mutex 带超时的递归互斥量
2.1 独占互斥量
一般的用法是通过 lock()
函数来阻塞线程,直到获得互斥量的所有权,然后开始执行任务。在完成任务后,使用 unlock()
来释放互斥量。lock()
和 unlock()
必须成对出现,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 #include <iostream> #include <thread> #include <string> using namespace std;std::mutex g_mu; void func (const string& str,int i) { g_mu.lock (); try { cout<<i<<" " <<str<<endl; } catch (...){ g_mu.unlock (); } g_mu.unlock (); } void sharefunc (const string& str) { for (int i = 0 ; i<20 ; i++){ func (str,i); } } int main (void ) { string str1 ("str_1" ) ,str2 ("str_2" ) ; thread t1 (sharefunc,str1) ,t2 (sharefunc,str2) ; t1. join (); t2. join (); return 0 ; }``` 这里需要注意的是,在使用 lock () 占用互斥量后,必须使用 unlock () 来解除占用,否则互斥量会一直被占用。 我们推荐使用另一种更安全更简单的方法: `lock_guard` 。因为 `lock_guard` 在构造时会自动锁定互斥量,而在退出作用域时会自动解锁,从而避免没有更新 unlock 。 那么上面的的例子中的 `func` 可以这么写: ```cpp void func (const string& str,int i) { std::lock_guard<std::mutex> locker (g_mu) ; cout<<i<<" " <<str<<endl; }
2.2 递归的独占互斥量
std::recursive_mutex
,递归的独占互斥量,它允许同一线程多次获得该互斥锁,可以用来解决同一线程需要多次获取互斥量时的死锁问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 class MyPoint {public : MyPoint (int x,int y): m_x (x),m_y (y){} public : void moveX (int i) { std::lock_guard<std::mutex> locker (m_mu) ; m_x += i; } void moveY (int i) { std::lock_guard<std::mutex> locker (m_mu) ; m_y += i; } void move (int x,int y) { std::lock_guard<std::mutex> locker (m_mu) ; moveX (x); moveY (y); } private : std::mutex m_mu; int m_x,m_y; }; void func () { MyPoint pt (0 ,0 ) ; pt.move (-1 , 1 ); } int main (void ) { std::thread t1 (func) ; t1. join (); return 0 ; }
上例会发生死锁。因为我们在调用 move()
时已经锁住了互斥量,move()
内调用 moveX()
时又请求互斥量,但这个时候互斥量被当前线程独占,未出作用域无法释放。这样就发生了死锁。要解决这个问题,一个简单的方法是使用std::recursive_mutex,它允许同一线程多次获得互斥量。
2.3 带超时的互斥量
time_mutex
和 recursive_mutex
是带超时的互斥锁。与前面两种互斥量的区别是它们带有超时等待功能。它们多了两个超时获取锁的接口, try_lock_for
和 try_lock_until
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 std::timed_mutex g_tmu; void func () { while (true ) { if (g_tmu.try_lock_for (std::chrono::seconds (3 ))){ cout<<"get mutex" <<endl; g_tmu.unlock (); } else { break ; } } } int main (void ) { std::thread t1 (func) ; t1. join (); return 0 ; }
2.4 lock_guard
和 unique_lock
在上面的例子中,我们提到了 lock_guard
,它可以在一个作用域内为互斥量提供锁,当 lock_guard
对象析构的时候,互斥量的锁同时被解除。 unique_lock
相比 lock_gurad
更加灵活,可以减小锁的粒度。
同 lock_guard
一样, unique_lock
在构造时立即加锁。但它也可以在构造时不加锁,而等到合适的时候再加锁,这需要为构造函数 unique_lock() 提供 std::defer_lock 参数,当需要加锁的时候,再执行 unique_lock:: lock() 函数来加锁,并可以在合适的时候调用 unique_lock::unlock() 来解锁。unique_lock 可以在其作用域内重复的加解锁,而 lock_guard 不可以。
2.5 call_once
和 once_flag
std::call_once
可以保证函数在多线程环境中只被调用一次。 once_flag
是 call_once
的一个参数。 例如在日志系统中,需要确保日志只被打开一次,我们可以使用 mutex 并加锁,但这并不是线程安全的:
1 2 3 4 5 6 7 8 9 10 11 12 13 class LogFile {public : LogFile (){ if (!m_file.is_open ()){ std::unique_lock<std::mutex> ulocker (m_mu) ; m_file.open ("log.txt" ); } } private : std::ofstream m_file; std::mutex m_mu; };
这个类并不是线程安全的。当第一个线程得到互斥时 m_mu
,并正在对文件进行 open()
操作时,第二个线程可能已经在等待互斥量了,当第一个线程执行完后,第二个线程立刻获得互斥量进行 open()
操作。那么我们进行如下的改进:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 class LogFile {public : LogFile (){ { std::unique_lock<std::mutex> ulocker (m_mu) ; if (!m_file.is_open ()){ m_file.open ("log.txt" ); } } } private : std::ofstream m_file; std::mutex m_mu; };
这段代码确实是线程安全的了。但是有另外一个问题,每次构造LogFile的实例的时候,都要执行加锁的代码,这造成了资源的浪费。C++11提供了有效的解决方案:使用 call_once
:
1 2 3 4 5 6 7 8 9 class LogFile {public : LogFile (){ std::call_once (m_fopenOnce,[&](){m_file.open ("log.txt" );}); } private : std::once_flag m_fopenOnce; std::ofstream m_file; };
这段代码, m_file 文件只会被多线程打开一次,而且更高效更简洁更方便。
3 条件变量
条件变量是c++11 提供的另种用于等待的同步机制,它能够阻塞一个或多个线程,直到收到另一个线程发出的通知或者超时才会唤醒当前阻塞的线程。C++11 两种条件就是:
condition_variable
,配合 unique_lock 进行 wait 操作
condition_variable_any
,配合任何带有 lock/unlock 语义的mutex 使用,比较灵活,但效率比 condition_variable
略低。
条件变量经常用于不同线程间共享数据的读取。例如一个简单的生产者消息者问题:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 #include <iostream> #include <thread> #include <queue> std::queue<int > g_que; std::mutex g_mu; const int g_maxSize = 10 ; void producer () { std::unique_lock<std::mutex> locker (g_mu,std::defer_lock) ; int i = 0 ; while (true ) { if (g_maxSize == g_que.size ()){ std::cout<<” full and wait for consumer”<<std::endl; std::this_thread::sleep_for (std::chrono::milliseconds (200 )); } else { i++; std::this_thread::sleep_for (std::chrono::milliseconds (50 )); locker.lock (); g_que.push (i); std::cout<<” push “<<i<<std::endl; locker.unlock (); } } } void consumer () { std::unique_lock<std::mutex> locker (g_mu,std::defer_lock) ; while (true ) { if (g_que.empty ()){ std::cout<<” empty and wait for producer”<<std::endl; std::this_thread::sleep_for (std::chrono::milliseconds (200 )); } else { locker.lock (); std::cout<<g_que.front ()<<” and left size = “<<g_que.size ()<<std::endl; g_que.pop (); locker.unlock (); std::this_thread::sleep_for (std::chrono::milliseconds (200 )); } } } int main (void ) { std::thread t1 (producer) ; std::thread t2 (consumer) ; t1. join (); t2. join (); return 0 ;}
在这段代码中,生产者和消费者都会去检查产品池的容量:对消费者来说,如果没有产品,则等待生产者向产品池中添加产品;对生产者来说,如果产品池已满,则等待消费者清理产品池空间。在这里,我们让线程 sleep 一段时间来达到这种效果,那么问题来了:等待的时长如何确定?如果等待的时间过长,必然会造成资源的浪费,如果等待的时间过短,则需要更多次的循环才能找到生产或消费的时机。这个时候我们可以使用条件变量。 使用条件变量修改后的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 #include <iostream> #include <thread> #include <queue> #include <condition_variable> std::queue<int > g_que; std::mutex g_mu; std::condition_variable g_notEmpty; std::condition_variable g_notFull; const int g_maxSize = 10 ; void producer () { int i = 0 ; while (true ) { std::unique_lock<std::mutex> locker (g_mu) ; if (g_maxSize == g_que.size ()){ std::cout<<" full and wait for consumer" <<std::endl; g_notFull.wait (locker); } i++; std::this_thread::sleep_for (std::chrono::milliseconds (50 )); g_que.push (i); std::cout<<" push " <<i<<std::endl; locker.unlock (); g_notEmpty.notify_one (); } } void consumer () { while (true ) { std::unique_lock<std::mutex> locker (g_mu) ; if (g_que.empty ()){ std::cout<<" empty and wait for producer" <<std::endl; g_notEmpty.wait (locker); } std::cout<<g_que.front ()<<" and left size = " <<g_que.size ()<<std::endl; g_que.pop (); locker.unlock (); g_notFull.notify_one (); std::this_thread::sleep_for (std::chrono::milliseconds (200 )); } } int main (void ) { std::thread t1 (producer) ; std::thread t2 (consumer) ; t1. join (); t2. join (); return 0 ; }
条件变量使用的一般流程是:
拥有条件变量的线程获取互斥量
检查条件,如果条件不满足则 wait 阻塞,直到 notify 提醒;如果条件满足,则向下执行
某个线程使(2.)的条件满足后,调用 notify_one() 或 notify_all() 唤醒一个或所有等待的线程。
wait 还有一个重载的方法,可以接受一个条件函数f,如果满足条件(f return true),则重新获取mutex,然后结束wait;如果不满足条件(f return false),则释放 mutex,将线程置于 waitting状态,直到收到notify,上面的生产者部分的代码还可以这么写:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 void producer () { int i = 0 ; while (true ) { std::unique_lock<std::mutex> locker (g_mu) ; g_notFull.wait (locker,[](){ if (g_maxSize == g_que.size ()){ std::cout<<" full and wait for consumer" <<std::endl; return false ; } else { return true ; } }); i++; std::this_thread::sleep_for (std::chrono::milliseconds (50 )); g_que.push (i); std::cout<<" push " <<i<<std::endl; locker.unlock (); g_notEmpty.notify_one (); } }
需要注意,条件 wait 时,会释放 mutex.当条件 wait 结束时,会重新获取 mutex.所以需要在条件wait 之后就获取互斥量,否则可能会引发死锁问题。
4 线程异步操作
4.1 std::future
在前面几节我们知道,通过 thread.join() 不能直接获取线程函数的返回值,需要定义一个变量,通过引用传递给线程函数,再在线程函数中给这个变量赋值。这个过程比较繁琐。 C++11提供了 future 来访问异步操作的结果(因为一个异步操作的结果不能马上获取,而不需要在未来的某个时间来获取,所以叫做future),它是一种获取异步操作结果的通道。
future 有3种状态:
Defeered. 异步操作还没有开始
Ready. 异步操作已经完成
Timeout. 异步操作超时
我们可以通过 future 的状态来了解线程内部的执行情况。 获取 future 的状态有3种方式:
get. 等待异步操作结束并返回结果
wait. 只是等待异步操作完成,没有返回值
wait_for. 超时等待返回结果
使用 std::async()
函数可以直接创建异步任务,并将结果保存在 future 中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 int func () { std::this_thread::sleep_for (std::chrono::seconds (3 )); return 9 ; } int main (void ) { std::future<int > fn = std::async (func); std::future_status stat; do { stat = fn.wait_for (std::chrono::milliseconds (500 )); switch (stat) { case std::future_status::deferred: std::cout<<”deferred”<<std::endl; break ; case std::future_status::ready: std::cout<<”ready”<<std::endl; break ; case std::future_status::timeout: std::cout<<”timeout”<<std::endl; break ; default : break ; } }while (stat != std::future_status::ready ); int n = fn.get ();std::cout<<n<<std::endl; return 0 ;}
如果我们对异步执行的过程不关心,则直接使用 future.get() 函数来等待异步任务的结果就可以了。
例如,我们需要创建一个子线程让其执行任务,接下来可以在主线程里执行其它的任务。在某个时间上,我们需要用到子线程的返回值。如果此时子线程执行完毕,则可以直接获取结果,如果此时子线程还没有执行完毕,则主线程阻塞直到子线程执行完毕。 这个过程使用 mutex 和 condition 的实现是这样的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 #include <iostream> #include <thread> #include <future> #include <condition_variable> std::mutex g_mu; std::condition_variable g_val; std::unique_lock<std::mutex> ulocker (g_mu,std::defer_lock) ;void func (int & n) { ulocker.lock (); std::this_thread::sleep_for (std::chrono::seconds (3 )); n = 9 ; ulocker.unlock (); g_val.notify_one (); } int main (void ) { int n = 0 ; std::thread t1 (func,std::ref(n)) ; t1. detach (); std::this_thread::sleep_for (std::chrono::seconds (5 )); if (n == 0 ) g_val.wait (ulocker); std::cout<<n<<std::endl; return 0 ; } 这个过程显得繁琐,时间处理不得当,还有可能引发死锁。这个时候就是 future 上场的时候了: int func () { std::this_thread::sleep_for (std::chrono::seconds (3 )); return 9 ; } int main (void ) { std::future<int > fn = std::async (func); std::this_thread::sleep_for (std::chrono::seconds (5 )); int n = fn.get (); std::cout<<n<<std::endl; return 0 ; }
这个例子中,主线程在开始执行子线程之后立即开始主线程里的工作,而不用关心子线程的执行过程。在某个时间点(主线程执行 5s 之后)需要用到子线程的返回结果,这个时候使用 future.get() 来获取结果。
4.2 std::promise
future 可以实现子线程向主线程传递数据。而 promise
也可以实现从一个线程向另一个线程传递数据。在一个使用 promise 包装过的 future 做参数的线程函数中,可以在一个线程里阻塞,直到另一个线程向其赋值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 #include <iostream> #include <thread> #include <future> int func (std::future<int >& f) { std::this_thread::sleep_for (std::chrono::seconds (3 )); int n = f.get (); n = n + 9 ; return n; } int main (void ) { std::promise<int > pr; std::future<int > f = pr.get_future (); std::future<int > fn = std::async ( func, std::ref (f)); pr.set_value (8 ); std::this_thread::sleep_for (std::chrono::seconds (5 )); int n = fn.get (); std::cout<<n<<std::endl; return 0 ; }
这个“承诺”是双向的。可以在主线程里赋值子线程里取值,也可以在子线程里赋值主线程里取值:
1 2 3 4 5 6 7 8 9 10 11 12 void fun (std::promise<int >& pr) { pr.set_value (9 ); } int main () { std::promise<int > pr; std::thread t1 (fun,std::ref(pr)) ; t1. detach (); std::future<int > fu = pr.get_future (); std::cout<<fu.get ()<<std::endl; return 0 ; }
4.3 std::package_task
promise 可用于在线程间传递值,而 package_task
则可以在线程间传递可调用对象(如 function, lambda, bind):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 #include <thread> #include <future> void func (std::packaged_task<int (int )>& pt) { pt (5 ); } int main () { auto af = [](int n){return n + 9 ;}; std::packaged_task<int (int ) > task (af) ; std::thread t1 (func,std::ref(task)) ; t1. detach (); std::future<int > fu = task.get_future (); std::cout<<fu.get ()<<std::endl; return 0 ; }
此例中,通过 task 向线程函数传递一个可调用对象,之后子线程中执行该调用。在主线程中,使用task.get_future().get() 来等待子线程调用执行并返回结果,从来实现异步的可调用对象传递。
4.4 std::async
线程异步操作函数在上几节中已经介绍过了。它的原型如下:
1 async (launch __policy, _Fp&& __f, _Args&&... __args)
launch
有三种定义:
async, 立即创建线程并执行。
deferred, 延迟加载的方式创建线程(在调用 future 的 get(),wait() 函数时才创建线程并执行)
any, any = async | deferred, 是 async 函数的默认参数,立即创建线程并执行。
async
使我们不用过多的关注异步线程的细节,是创建异步操作的首选。