首页 > Coding > C++11/14 新特性 (多线程)

C++11/14 新特性 (多线程)

2017年3月28日 发表评论 阅读评论

在 C++11 之前 ,C++ 标准并没有提供统一的并发编程标准,也没有提供语言级别的支持。这导致我们在编写可移植的多线程程序时很不方便,往往需要面向不同的平台进行不同的实现,或者引入一些第三方平台,如Boost,pthread_win32 等。 从C++11开始 ,对并发编程进行了语言级别的支持,使用使用C++进行并发编程方便了很多。这里介绍C++11并发编程的相关特性。

1 线程

1.1 线程的创建

std::thread 的构造函数如下:

我们只需要提供线程函数或函数对象,即可以创建线程,并可以同时指定线程函数的参数。

join函数将会阻塞线程,直到线程函数执行完毕,主线程才会接着执行。如果线程函数有返回值,返回值将被忽略。 在使用线程对象的过程中,我们需要注意线程对象的生命周期。如果线程对象先于线程函数结束,那么将会出现不可预料的错误。可以通过线程阻塞的方式来等待线程函数执行完(join),或让线程在后台执行。 如果不希望线程被阻塞,可以调用线程的 detach() 函数,将线程与线程对象分离。但需要注意的是,detach 之后的线程无法再使用join来进行阻塞了,即detach之后的线程,我们无法控制了。当我们不确定一个线程是否可以join时,可以先使用 thead::joinable() 来进行判断。

另外,我们还可以通过 std::bind, lambda 来创建线程(其实就是使用函数对象创建线程)。

线程不可以被复制,但是可以被移动:

线程被移动之后,原来的线程对象将不再代表任何线程。

1.1.1 thread 的一般性用法

2 互斥量和锁

互斥时用来保护被多个线程同时访问的共享数据 c++11提供了4种语义的互斥量:

  • std::mutex 独占的互斥量
  • std::timed_mutex 带超时的独占互斥量,不可递归使用
  • std::recursive_mutex 递归互斥量,不带超时功能
  • std::recursive_timed_mutex 带超时的递归互斥量

2.1 独占互斥量

一般的用法是通过 lock() 函数来阻塞线程,直到获得互斥量的所有权,然后开始执行任务。在完成任务后,使用 unlock() 来释放互斥量。lock() 和 unlock() 必须成对出现,

这里需要注意的是,在使用 lock() 占用互斥量后,必须使用 unlock() 来解除占用,否则互斥量会一直被占用。 我们推荐使用另一种更安全更简单的方法: lock_guard 。因为 lock_guard 在构造时会自动锁定互斥量,而在退出作用域时会自动解锁,从而避免没有更新 unlock 。

那么上面的的例子中的func可以这么写:

2.2 递归的独占互斥量

std::recursive_mutex,递归的独占互斥量,它允许同一线程多次获得该互斥锁,可以用来解决同一线程需要多次获取互斥量时的死锁问题。

上例会发生死锁。因为我们在调用 move() 时已经锁住了互斥量,move() 内调用 moveX() 时又请求互斥量,但这个时候互斥量被当前线程独占,未出作用域无法释放。这样就发生了死锁。要解决这个问题,一个简单的方法是使用std::recursive_mutex,它允许同一线程多次获得互斥量。

2.3 带超时的互斥量

time_mutex 和 recursive_mutex 是带超时的互斥锁。与前面两种互斥量的区别是它们带有超时等待功能。它们多了两个超时获取锁的接口, try_lock_for 和 try_lock_until 。

2.4 lock_guard 和 unique_lock

在上面的例子中,我们提到了 lock_guard ,它可以在一个作用域内为互斥量提供锁,当 lock_guard 对象析构的时候,互斥量的锁同时被解除。 unique_lock 相比 lock_gurad 更加灵活,可以减小锁的粒度。

同 lock_guard 一样, unique_lock 在构造时立即加锁。但它也可以在构造时不加锁,而等到合适的时候再加锁,这需要为构造函数 unique_lock() 提供 std::defer_lock 参数,当需要加锁的时候,再执行 unique_lock:: lock() 函数来加锁,并可以在合适的时候调用 unique_lock::unlock() 来解锁。unique_lock 可以在其作用域内重复的加解锁,而 lock_guard 不可以。

2.5 call_once 和 once_flag

std::call_once 可以保证函数在多线程环境中只被调用一次。 once_flag 是 call_once 的一个参数。 例如在日志系统中,需要确保日志只被打开一次,我们可以使用 mutex 并加锁,但这并不是线程安全的:

这个类并不是线程安全的。当第一个线程得到互斥时 m_mu ,并正在对文件进行 open() 操作时,第二个线程可能已经在等待互斥量了,当第一个线程执行完后,第二个线程立刻获得互斥量进行 open() 操作。那么我们进行如下的改进:

这段代码确实是线程安全的了。但是有另外一个问题,每次构造LogFile的实例的时候,都要执行加锁的代码,这造成了资源的浪费。C++11提供了有效的解决方案:使用 call_once:

这段代码, m_file 文件只会被多线程打开一次,而且更高效更简洁更方便。

3 条件变量

条件变量是c++11 提供的另种用于等待的同步机制,它能够阻塞一个或多个线程,直到收到另一个线程发出的通知或者超时才会唤醒当前阻塞的线程。C++11 两种条件就是:

  • condition_variable ,配合 unique_lock 进行 wait 操作
  • condition_variable_any ,配合任何带有 lock/unlock 语义的mutex 使用,比较灵活,但效率比 condition_variable 略低。

条件变量经常用于不同线程间共享数据的读取。例如一个简单的生产者消息者问题:

在这段代码中,生产者和消费者都会去检查产品池的容量:对消费者来说,如果没有产品,则等待生产者向产品池中添加产品;对生产者来说,如果产品池已满,则等待消费者清理产品池空间。在这里,我们让线程 sleep 一段时间来达到这种效果,那么问题来了:等待的时长如何确定?如果等待的时间过长,必然会造成资源的浪费,如果等待的时间过短,则需要更多次的循环才能找到生产或消费的时机。这个时候我们可以使用条件变量。 使用条件变量修改后的代码:

条件变量使用的一般流程是:

  1. 拥有条件变量的线程获取互斥量
  2. 检查条件,如果条件不满足则 wait 阻塞,直到 notify 提醒;如果条件满足,则向下执行
  3. 某个线程使(2.)的条件满足后,调用 notify_one() 或 notify_all() 唤醒一个或所有等待的线程。

wait 还有一个重载的方法,可以接受一个条件函数f,如果满足条件(f return true),则重新获取mutex,然后结束wait;如果不满足条件(f return false),则释放 mutex,将线程置于 waitting状态,直到收到notify,上面的生产者部分的代码还可以这么写:

需要注意,条件 wait 时,会释放 mutex.当条件 wait 结束时,会重新获取 mutex.所以需要在条件wait 之后就获取互斥量,否则可能会引发死锁问题。

4 线程异步操作

4.1 std::future

在前面几节我们知道,通过 thread.join() 不能直接获取线程函数的返回值,需要定义一个变量,通过引用传递给线程函数,再在线程函数中给这个变量赋值。这个过程比较繁琐。 C++11提供了 future 来访问异步操作的结果(因为一个异步操作的结果不能马上获取,而不需要在未来的某个时间来获取,所以叫做future),它是一种获取异步操作结果的通道。

future 有3种状态:

  1. Defeered. 异步操作还没有开始
  2. Ready. 异步操作已经完成
  3. Timeout. 异步操作超时

我们可以通过 future 的状态来了解线程内部的执行情况。 获取 future 的状态有3种方式:

  1. get. 等待异步操作结束并返回结果
  2. wait. 只是等待异步操作完成,没有返回值
  3. wait_for. 超时等待返回结果

使用 std::async() 函数可以直接创建异步任务,并将结果保存在 future 中:

如果我们对异步执行的过程不关心,则直接使用 future.get() 函数来等待异步任务的结果就可以了。

例如,我们需要创建一个子线程让其执行任务,接下来可以在主线程里执行其它的任务。在某个时间上,我们需要用到子线程的返回值。如果此时子线程执行完毕,则可以直接获取结果,如果此时子线程还没有执行完毕,则主线程阻塞直到子线程执行完毕。 这个过程使用 mutex 和 condition 的实现是这样的:

这个过程显得繁琐,时间处理不得当,还有可能引发死锁。这个时候就是 future 上场的时候了:

这个例子中,主线程在开始执行子线程之后立即开始主线程里的工作,而不用关心子线程的执行过程。在某个时间点(主线程执行 5s 之后)需要用到子线程的返回结果,这个时候使用 future.get() 来获取结果。

4.2 std::promise

future 可以实现子线程向主线程传递数据。而 promise 也可以实现从一个线程向另一个线程传递数据。在一个使用 promise 包装过的 future 做参数的线程函数中,可以在一个线程里阻塞,直到另一个线程向其赋值。有没有觉得很浪漫 :-) 。

这个“承诺”是双向的。可以在主线程里赋值子线程里取值,也可以在子线程里赋值主线程里取值:

4.3 std::package_task

promise 可用于在线程间传递值,而 package_task 则可以在线程间传递可调用对象(如 function, lambda, bind):

此例中,通过 task 向线程函数传递一个可调用对象,之后子线程中执行该调用。在主线程中,使用task.get_future().get() 来等待子线程调用执行并返回结果,从来实现异步的可调用对象传递。

4.4 std::async

线程异步操作函数在上几节中已经介绍过了。它的原型如下:

launch 有三种定义:

  1. async, 立即创建线程并执行。
  2. deferred, 延迟加载的方式创建线程(在调用 future 的 get(),wait() 函数时才创建线程并执行)
  3. any, any = async | deferred, 是 async 函数的默认参数,立即创建线程并执行。

async 使我们不用过多的关注异步线程的细节,是创建异步操作的首选。