C++11/14 新特性 (多线程)

在 C++11 之前 ,C++ 标准并没有提供统一的并发编程标准,也没有提供语言级别的支持。这导致我们在编写可移植的多线程程序时很不方便,往往需要面向不同的平台进行不同的实现,或者引入一些第三方平台,如Boost,pthread_win32 等。 从C++11开始 ,对并发编程进行了语言级别的支持,使用使用C++进行并发编程方便了很多。这里介绍C++11并发编程的相关特性。

1 线程

1.1 线程的创建

std::thread 的构造函数如下:

1
2
template< class Function, class... Args > 
explicit thread( Function&& f, Args&&... args );

我们只需要提供线程函数或函数对象,即可以创建线程,并可以同时指定线程函数的参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <iostream>
#include <thread>
using namespace std;

void func(const string& str){
cout<<str<<endl;
}

class FuncCls{
public:
void operator()(){
cout<<"Funcls called"<<endl;
}
};

int main(void){
thread t1(func,"Hello World");
FuncCls fcls;
thread t2(fcls);

if(t1.joinable()){
t1.join();
}
t2.join();
return 0;
}

join 函数将会阻塞线程,直到线程函数执行完毕,主线程才会接着执行。如果线程函数有返回值,返回值将被忽略。 在使用线程对象的过程中,我们需要注意线程对象的生命周期。如果线程对象先于线程函数结束,那么将会出现不可预料的错误。可以通过线程阻塞的方式来等待线程函数执行完(join),或让线程在后台执行。 如果不希望线程被阻塞,可以调用线程的 detach() 函数,将线程与线程对象分离。但需要注意的是,detach 之后的线程无法再使用join来进行阻塞了,即detach之后的线程,我们无法控制了。当我们不确定一个线程是否可以join时,可以先使用 thead::joinable() 来进行判断。

另外,我们还可以通过 std::bind, lambda 来创建线程(其实就是使用函数对象创建线程)。

线程不可以被复制,但是可以被移动:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <iostream>
#include <thread>
using namespace std;

void func(const string& str){
cout<<str<<endl;
}

int main(void){
thread t1(func,"Hello World");
thread t2(move(t1));

//t1.join(); //ERROR libc++abi.dylib: terminating with uncaught exception of type std::__1::system_error: thread::join failed: No such process
t2.join();

return 0;
}

线程被移动之后,原来的线程对象将不再代表任何线程。

1.1.1 thread 的一般性用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void func(int nSec){
std::this_thread::sleep_for(std::chrono::seconds(nSec));
cout<<"time out"<<endl;
}

int main(void){
thread t1(func,3); //阻塞休眠3s
t1.join();


cout&lt;&lt;std::thread::hardware_concurrency()&lt;&lt;endl; //获取cpu核心数

return 0;
}

2 互斥量和锁

互斥时用来保护被多个线程同时访问的共享数据 c++11提供了4种语义的互斥量:

  • std::mutex 独占的互斥量
  • std::timed_mutex 带超时的独占互斥量,不可递归使用
  • std::recursive_mutex 递归互斥量,不带超时功能
  • std::recursive_timed_mutex 带超时的递归互斥量

2.1 独占互斥量

一般的用法是通过 lock() 函数来阻塞线程,直到获得互斥量的所有权,然后开始执行任务。在完成任务后,使用 unlock() 来释放互斥量。lock()unlock() 必须成对出现,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include <iostream>
#include <thread>
#include <string>
using namespace std;

std::mutex g_mu;

void func(const string& str,int i){
g_mu.lock();
try{
cout<<i<<" "<<str<<endl;
//do something
}
catch(...){
g_mu.unlock();
}
g_mu.unlock();
}

void sharefunc(const string& str){
for(int i = 0; i<20; i++){
func(str,i);
}
}

int main(void){
string str1("str_1"),str2("str_2");
thread t1(sharefunc,str1),t2(sharefunc,str2);
t1.join();
t2.join();

return 0;
}```

这里需要注意的是,在使用 lock() 占用互斥量后,必须使用 unlock() 来解除占用,否则互斥量会一直被占用。 我们推荐使用另一种更安全更简单的方法: `lock_guard` 。因为 `lock_guard` 在构造时会自动锁定互斥量,而在退出作用域时会自动解锁,从而避免没有更新 unlock 。

那么上面的的例子中的 `func` 可以这么写:

```cpp
void func(const string& str,int i){
std::lock_guard<std::mutex> locker(g_mu);
cout<<i<<" "<<str<<endl;
}

2.2 递归的独占互斥量

std::recursive_mutex,递归的独占互斥量,它允许同一线程多次获得该互斥锁,可以用来解决同一线程需要多次获取互斥量时的死锁问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class MyPoint {
public:
MyPoint(int x,int y):
m_x(x),m_y(y){}
public:
void moveX(int i){
std::lock_guard<std::mutex> locker(m_mu);
m_x += i;
}

void moveY(int i){
std::lock_guard<std::mutex> locker(m_mu);
m_y += i;
}

void move(int x,int y){
std::lock_guard<std::mutex> locker(m_mu);
moveX(x);
moveY(y);
}
private:
std::mutex m_mu;
int m_x,m_y;
};

void func(){
MyPoint pt(0,0);
pt.move(-1, 1);
}


int main(void){
std::thread t1(func);
t1.join();

return 0;
}

上例会发生死锁。因为我们在调用 move() 时已经锁住了互斥量,move() 内调用 moveX() 时又请求互斥量,但这个时候互斥量被当前线程独占,未出作用域无法释放。这样就发生了死锁。要解决这个问题,一个简单的方法是使用std::recursive_mutex,它允许同一线程多次获得互斥量。

2.3 带超时的互斥量
time_mutexrecursive_mutex 是带超时的互斥锁。与前面两种互斥量的区别是它们带有超时等待功能。它们多了两个超时获取锁的接口, try_lock_fortry_lock_until

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

std::timed_mutex g_tmu;

void func(){
while (true) {
if(g_tmu.try_lock_for(std::chrono::seconds(3))){
cout<<"get mutex"<<endl;
g_tmu.unlock();
}
else{
break;
}
}
}

int main(void){
std::thread t1(func);
t1.join();

return 0;
}

2.4 lock_guardunique_lock

在上面的例子中,我们提到了 lock_guard ,它可以在一个作用域内为互斥量提供锁,当 lock_guard 对象析构的时候,互斥量的锁同时被解除。 unique_lock 相比 lock_gurad 更加灵活,可以减小锁的粒度。

lock_guard 一样, unique_lock 在构造时立即加锁。但它也可以在构造时不加锁,而等到合适的时候再加锁,这需要为构造函数 unique_lock() 提供 std::defer_lock 参数,当需要加锁的时候,再执行 unique_lock:: lock() 函数来加锁,并可以在合适的时候调用 unique_lock::unlock() 来解锁。unique_lock 可以在其作用域内重复的加解锁,而 lock_guard 不可以。

2.5 call_onceonce_flag

std::call_once 可以保证函数在多线程环境中只被调用一次。 once_flagcall_once 的一个参数。 例如在日志系统中,需要确保日志只被打开一次,我们可以使用 mutex 并加锁,但这并不是线程安全的:

1
2
3
4
5
6
7
8
9
10
11
12
13

class LogFile{
public:
LogFile(){
if(!m_file.is_open()){
std::unique_lock<std::mutex> ulocker(m_mu);
m_file.open("log.txt");
}
}
private:
std::ofstream m_file;
std::mutex m_mu; //用于锁定 m_file
};

这个类并不是线程安全的。当第一个线程得到互斥时 m_mu ,并正在对文件进行 open() 操作时,第二个线程可能已经在等待互斥量了,当第一个线程执行完后,第二个线程立刻获得互斥量进行 open() 操作。那么我们进行如下的改进:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class LogFile{
public:
LogFile(){
{
std::unique_lock<std::mutex> ulocker(m_mu);
if(!m_file.is_open()){
m_file.open("log.txt");
}
}
}
private:
std::ofstream m_file;
std::mutex m_mu; //用于锁定 m_file
};

这段代码确实是线程安全的了。但是有另外一个问题,每次构造LogFile的实例的时候,都要执行加锁的代码,这造成了资源的浪费。C++11提供了有效的解决方案:使用 call_once:

1
2
3
4
5
6
7
8
9
class LogFile{
public:
LogFile(){
std::call_once(m_fopenOnce,[&](){m_file.open("log.txt");});
}
private:
std::once_flag m_fopenOnce;
std::ofstream m_file;
};

这段代码, m_file 文件只会被多线程打开一次,而且更高效更简洁更方便。

3 条件变量

条件变量是c++11 提供的另种用于等待的同步机制,它能够阻塞一个或多个线程,直到收到另一个线程发出的通知或者超时才会唤醒当前阻塞的线程。C++11 两种条件就是:

condition_variable ,配合 unique_lock 进行 wait 操作
condition_variable_any ,配合任何带有 lock/unlock 语义的mutex 使用,比较灵活,但效率比 condition_variable 略低。

条件变量经常用于不同线程间共享数据的读取。例如一个简单的生产者消息者问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#include <iostream>
#include <thread>
#include <queue>

std::queue<int> g_que; //产品池
std::mutex g_mu;
const int g_maxSize = 10; //产品池最大容量


// 生产者
void producer(){
std::unique_lock<std::mutex> locker(g_mu,std::defer_lock);
int i = 0;
while (true) {
if(g_maxSize == g_que.size()){ //当产品池满时,暂停生产并等待消费者减少库存
std::cout<<” full and wait for consumer”<<std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
else{
i++;
std::this_thread::sleep_for(std::chrono::milliseconds(50));
locker.lock();
g_que.push(i);
std::cout<<” push “<<i<<std::endl;
locker.unlock();
}
}
}


//消费者
void consumer(){
std::unique_lock<std::mutex> locker(g_mu,std::defer_lock);
while (true) {
if(g_que.empty()){ //当无产品时,暂停消费并等待生产者生产产品
std::cout<<” empty and wait for producer”<<std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
else{
locker.lock();
std::cout<<g_que.front()<<” and left size = “<<g_que.size()<<std::endl;
g_que.pop();
locker.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
}


int main(void){
std::thread t1(producer);
std::thread t2(consumer);
t1.join();
t2.join();


return 0;
}

在这段代码中,生产者和消费者都会去检查产品池的容量:对消费者来说,如果没有产品,则等待生产者向产品池中添加产品;对生产者来说,如果产品池已满,则等待消费者清理产品池空间。在这里,我们让线程 sleep 一段时间来达到这种效果,那么问题来了:等待的时长如何确定?如果等待的时间过长,必然会造成资源的浪费,如果等待的时间过短,则需要更多次的循环才能找到生产或消费的时机。这个时候我们可以使用条件变量。 使用条件变量修改后的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#include <iostream>
#include <thread>
#include <queue>
#include <condition_variable>
std::queue<int> g_que; //产品池
std::mutex g_mu; //保护产品池的互斥量
std::condition_variable g_notEmpty; //产品池不为空的条件
std::condition_variable g_notFull; //产品池不满的条件
const int g_maxSize = 10; //产品池最大容量


// 生产者
void producer(){
int i = 0;
while (true) {
std::unique_lock<std::mutex> locker(g_mu);
if(g_maxSize == g_que.size()){
std::cout<<" full and wait for consumer"<<std::endl;
g_notFull.wait(locker); //当产品池满时,阻塞当前线程,暂停生产并等待 notify 提醒
}
i++;
std::this_thread::sleep_for(std::chrono::milliseconds(50));
g_que.push(i);
std::cout<<" push "<<i<<std::endl;
locker.unlock();
g_notEmpty.notify_one();
}
}

//消费者
void consumer(){
while (true) {
std::unique_lock<std::mutex> locker(g_mu);
if(g_que.empty()){
std::cout<<" empty and wait for producer"<<std::endl;
g_notEmpty.wait(locker); //当无产品时,阻塞线程,暂停消费并等待 生产者添加产品后发送 notify 提醒
}
std::cout<<g_que.front()<<" and left size = "<<g_que.size()<<std::endl;
g_que.pop();
locker.unlock();
g_notFull.notify_one();
std::this_thread::sleep_for(std::chrono::milliseconds(200));

}
}

int main(void){
std::thread t1(producer);
std::thread t2(consumer);
t1.join();
t2.join();

return 0;
}

条件变量使用的一般流程是:

拥有条件变量的线程获取互斥量
检查条件,如果条件不满足则 wait 阻塞,直到 notify 提醒;如果条件满足,则向下执行
某个线程使(2.)的条件满足后,调用 notify_one() 或 notify_all() 唤醒一个或所有等待的线程。
wait 还有一个重载的方法,可以接受一个条件函数f,如果满足条件(f return true),则重新获取mutex,然后结束wait;如果不满足条件(f return false),则释放 mutex,将线程置于 waitting状态,直到收到notify,上面的生产者部分的代码还可以这么写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 生产者
void producer(){
int i = 0;
while (true) {
std::unique_lock<std::mutex> locker(g_mu);
g_notFull.wait(locker,[](){
if(g_maxSize == g_que.size()){
std::cout<<" full and wait for consumer"<<std::endl;
return false;
}
else{
return true;
}
});
i++;
std::this_thread::sleep_for(std::chrono::milliseconds(50));
g_que.push(i);
std::cout<<" push "<<i<<std::endl;
locker.unlock();
g_notEmpty.notify_one();
}
}

需要注意,条件 wait 时,会释放 mutex.当条件 wait 结束时,会重新获取 mutex.所以需要在条件wait 之后就获取互斥量,否则可能会引发死锁问题。

4 线程异步操作

4.1 std::future

在前面几节我们知道,通过 thread.join() 不能直接获取线程函数的返回值,需要定义一个变量,通过引用传递给线程函数,再在线程函数中给这个变量赋值。这个过程比较繁琐。 C++11提供了 future 来访问异步操作的结果(因为一个异步操作的结果不能马上获取,而不需要在未来的某个时间来获取,所以叫做future),它是一种获取异步操作结果的通道。

future 有3种状态:

  • Defeered. 异步操作还没有开始
  • Ready. 异步操作已经完成
  • Timeout. 异步操作超时

我们可以通过 future 的状态来了解线程内部的执行情况。 获取 future 的状态有3种方式:

  • get. 等待异步操作结束并返回结果
  • wait. 只是等待异步操作完成,没有返回值
  • wait_for. 超时等待返回结果

使用 std::async() 函数可以直接创建异步任务,并将结果保存在 future 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
int func(){
std::this_thread::sleep_for(std::chrono::seconds(3));
return 9;
}

int main(void){
std::future<int> fn = std::async(func);
std::future_status stat;
do{
stat = fn.wait_for(std::chrono::milliseconds(500));
switch (stat) {
case std::future_status::deferred:
std::cout<<”deferred”<<std::endl;
break;
case std::future_status::ready:
std::cout<<”ready”<<std::endl;
break;
case std::future_status::timeout:
std::cout<<”timeout”<<std::endl;
break;


default:
break;
}
}while(stat != std::future_status::ready );

int n = fn.get();
std::cout&lt;&lt;n&lt;&lt;std::endl;
return 0;
}

如果我们对异步执行的过程不关心,则直接使用 future.get() 函数来等待异步任务的结果就可以了。
例如,我们需要创建一个子线程让其执行任务,接下来可以在主线程里执行其它的任务。在某个时间上,我们需要用到子线程的返回值。如果此时子线程执行完毕,则可以直接获取结果,如果此时子线程还没有执行完毕,则主线程阻塞直到子线程执行完毕。 这个过程使用 mutex 和 condition 的实现是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include <iostream>
#include <thread>
#include <future>
#include <condition_variable>

std::mutex g_mu;
std::condition_variable g_val;
std::unique_lock<std::mutex> ulocker(g_mu,std::defer_lock);

void func(int& n){
ulocker.lock();

std::this_thread::sleep_for(std::chrono::seconds(3));
n = 9;

ulocker.unlock();
g_val.notify_one();
}

int main(void){
int n = 0;
std::thread t1(func,std::ref(n));
t1.detach();

//do something
std::this_thread::sleep_for(std::chrono::seconds(5));
if(n == 0)
g_val.wait(ulocker);
std::cout<<n<<std::endl;

return 0;
}
这个过程显得繁琐,时间处理不得当,还有可能引发死锁。这个时候就是 future 上场的时候了:

int func(){
std::this_thread::sleep_for(std::chrono::seconds(3));
return 9;
}

int main(void){
std::future<int> fn = std::async(func);

//do something ...
std::this_thread::sleep_for(std::chrono::seconds(5));

int n = fn.get();
std::cout<<n<<std::endl;
return 0;
}

这个例子中,主线程在开始执行子线程之后立即开始主线程里的工作,而不用关心子线程的执行过程。在某个时间点(主线程执行 5s 之后)需要用到子线程的返回结果,这个时候使用 future.get() 来获取结果。

4.2 std::promise

future 可以实现子线程向主线程传递数据。而 promise 也可以实现从一个线程向另一个线程传递数据。在一个使用 promise 包装过的 future 做参数的线程函数中,可以在一个线程里阻塞,直到另一个线程向其赋值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include <iostream>
#include <thread>
#include <future>

int func(std::future<int>& f){
//do something
std::this_thread::sleep_for(std::chrono::seconds(3));
int n = f.get(); //阻塞,直到另一个线程给赋值
n = n + 9;
return n;
}

int main(void){
std::promise<int> pr;
std::future<int> f = pr.get_future();

std::future<int> fn = std::async( func, std::ref(f));

//do something

pr.set_value(8); //赋值

//do something
std::this_thread::sleep_for(std::chrono::seconds(5));

int n = fn.get();

std::cout<<n<<std::endl;
return 0;
}

这个“承诺”是双向的。可以在主线程里赋值子线程里取值,也可以在子线程里赋值主线程里取值:

1
2
3
4
5
6
7
8
9
10
11
12
void fun(std::promise<int>& pr){
pr.set_value(9);
}

int main(){
std::promise<int> pr;
std::thread t1(fun,std::ref(pr));
t1.detach();
std::future<int> fu = pr.get_future();
std::cout<<fu.get()<<std::endl;
return 0;
}

4.3 std::package_task

promise 可用于在线程间传递值,而 package_task 则可以在线程间传递可调用对象(如 function, lambda, bind):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include <thread>
#include <future>

void func(std::packaged_task<int(int)>& pt){
pt(5);
}

int main(){
auto af = [](int n){return n + 9;};

std::packaged_task<int(int)> task(af);
std::thread t1(func,std::ref(task));
t1.detach();
std::future<int> fu = task.get_future();
std::cout<<fu.get()<<std::endl;

return 0;
}

此例中,通过 task 向线程函数传递一个可调用对象,之后子线程中执行该调用。在主线程中,使用task.get_future().get() 来等待子线程调用执行并返回结果,从来实现异步的可调用对象传递。

4.4 std::async

线程异步操作函数在上几节中已经介绍过了。它的原型如下:

1
async(launch __policy, _Fp&& __f, _Args&&... __args)

launch 有三种定义:

  • async, 立即创建线程并执行。
  • deferred, 延迟加载的方式创建线程(在调用 future 的 get(),wait() 函数时才创建线程并执行)
  • any, any = async | deferred, 是 async 函数的默认参数,立即创建线程并执行。

async 使我们不用过多的关注异步线程的细节,是创建异步操作的首选。