【C++】多线程同步

技术博客 (274) 2023-09-16 16:10:03

线程间的同步方法大体可以分为两类:用户模式和内核模式。内核模式就是指利用系统内核对象的单一性来进行同步,使用时需要切换内核态与用户态,而用户模式就是不需要切换到内核态,只在用户态完成操作。

  • 用户模式下的方法有:原子操作(例如一个单一的全局变量)、临界区。特点是:同步速度特别快。
  • 内核模式下的方法有:事件、信号量、互斥量。同步速度较慢,但适用性比较好。

**临界区:**通过对多线程的串行化来访问公共资源或一段代码、速度快,适合控制数据访问。

**互斥量:**为协调共同对一个共享资源的单独访问而设计的。

**信号量:**为控制一个具有有限数量用户资源而设计的。

**事件:**用来通知线程有一些时间已发生,从而启动后继任务的开始。

1 atomic

atomic<int> num{0};
int main(int, char **)
{
    std::cout << "========boot=======" << endl;

    //检查是否无锁的
    std::cout << "num.is_lock_free():" << num.is_lock_free() << endl;

    num.store(10);                             //存储值
    std::cout << "num:" << num.load() << endl; //读取值

    int a = num.exchange(100); //交换值,返回原来的值
    std::cout << "num:" << num.load() << endl;

    std::cout << "========over=======" << endl;
}

atomic并不能保证类型T是无锁的,另外不同平台的处理器处理方式不同,也不能保证必定无锁,所以该类型都会有is_lock_free() 函数来判断是否无锁。

有一个比较特殊的原子类型是atomic_flag,因为atomic_flag与其他原子类型不同,它是无锁的,即线程对其访问不需要加锁,而其他的原子类型不一定是无锁的。

atomic_flag flag = ATOMIC_FLAG_INIT; //初始化
int main(int, char **)
{
    std::cout << "========boot=======" << endl;

    //之前还未设置标志,所以调用的时候返回false,然后设置了标志
    std::cout << "ret:" << flag.test_and_set() << endl;
    //再次调用的时候已经设置过标志,所以返回true
    std::cout << "ret:" << flag.test_and_set() << endl;

    flag.clear(); //清除标志
    //清除标志后,返回值为false,并且设置了标志
    std::cout << "ret:" << flag.test_and_set() << endl;
    //设置标志后,返回值为true
    std::cout << "ret:" << flag.test_and_set() << endl;
    std::cout << "========over=======" << endl;
}

2 临界区

在Linux平台下,没有临界区的概念。

#include <windows.h>
CRITICAL_SECTION cs; //定义临界区对象
void foo()
{
    EnterCriticalSection(&cs);
    /* code 公共资源代码*/
    LeaveCriticalSection(&cs);
}

int main(int, char **)
{
    std::cout << "========boot=======" << endl;
    InitializeCriticalSection(&cs); //初始化临界区
    thread th(foo);
    thread th(foo);
    InitializeCriticalSection(&cs); //初始化临界区
    std::cout << "========over=======" << endl;
}

3 互斥量

在C++11中被命名为Mutex,所有其相关的类和函数都在头文件mutex中。一共有四种互斥元类,分别是:

  • **std::mutex;**最基本的互斥元类
  • **std::recursive_mutex;**递归Mutex类(同一线程可以对互斥量多次上锁,来获得对互斥量对象的多层所有权)。
  • **std::timed_mutex;**定时Mutex类。
  • **std::recursive_timed_mutex;**定时递归Mutex类

上述四种互斥量元类都有一个成员函数lock和unlock来实现锁定与解锁的操作。

除此之外,还有两种lock类,分别是:

  • **std::lock_guard;**这个类的使用类似智能指针,可以销毁时自动解锁;
  • **std::unique_lock;**这个类与1用法相同,但提供了更灵活的上锁和解锁控制,同时也更占资源。

3.1 std::mutex类

  • 构造函数,std::mutex不允许拷贝构造,也不允许 move 拷贝,最初产生的 mutex 对象是处于 unlocked 状态的。
  • lock(),调用线程将锁住该互斥量。线程调用该函数会发生下面3 种情况:
    1. 如果该互斥量当前没有被锁住,则调用线程将该互斥量锁住,直到调用 unlock之前,该线程一直拥有该锁。
    2. 如果当前互斥量被其他线程锁住,则当前的调用线程被阻塞住。
    3. 如果当前互斥量被当前调用线程锁住,则会产生死锁(deadlock)。
  • unlock(), 解锁,释放对互斥量的所有权。
  • try_lock(),尝试锁住互斥量,如果互斥量被其他线程占有,则当前线程也不会被阻塞。线程调用该函数也会出现下面3 种情况:
    1. 如果当前互斥量没有被其他线程占有,则该线程锁住互斥量,直到该线程调用 unlock 释放互斥量。
    2. 如果当前互斥量被其他线程锁住,则当前调用线程返回 false,而并不会被阻塞掉。
    3. 如果当前互斥量被当前调用线程锁住,则会产生死锁(deadlock)。
void foo()
{
    if(mtx.try_lock())
    {
         /*公共资源code*/
         mtx.unlock();
    }
}
#include <mutex>
std::mutex mtx;
int main(int, char **)
{
    std::cout << "========boot=======" << endl;
    thread th1(foo);
    thread th2(foo);
    th1.join();
    th2.join();
    std::cout << "========over=======" << endl;
}

3.2 lock_guard

#include <mutex>
std::mutex mtx;
void foo()
{
    //使用mutex对象定义一个局部的lock_guard对象
    std::lock_guard<std::mutex> lock(mtx);
    /*公共资源code*/
    //只有等到该lock_guard对象销毁后才能解锁
}

int main(int, char **)
{
    std::cout << "========boot=======" << endl;
    thread th1(foo);
    thread th2(foo);
    th1.join();
    th2.join();
    std::cout << "========over=======" << endl;
}

3.3 unique_lock

lock_guard本身并没有提供枷锁和解锁的接口,智能保证再析构的时候执行解锁操作,不够灵活。

但是unique_lock提供了lock()和unlock()接口,能记录现在处于上锁还是解锁状态,在析构的时候,会根据当前状态来决定是否需要解锁。然而这是有代价的,因为它内部需要维护锁的状态,所以效率要比lock_guard低一点,在lock_guard能解决问题的时候,就用lock_guard,反之使用unique_lock。

#include <mutex>
std::mutex mtx;
void foo()
{
    //使用mutex对象定义一个局部的lock_guard对象
    std::unique_lock<std::mutex> lock(mtx);
    /*do something1 code*/

    lock.unlock();
    /*do others */

    lock.lock();
    /*do something2 code*/
}

int main(int, char **)
{
    std::cout << "========boot=======" << endl;
    thread th1(foo);
    thread th2(foo);
    th1.join();
    th2.join();
    std::cout << "========over=======" << endl;
}

3.4 std::condition_variable

condition_variable是一个类,搭配互斥量mutex来用,这个类主要有wait函数和notify函数。程序运行到wait函数的时候会先在此阻塞,然后自动unlock,那么其他线程在拿到锁以后就会往下运行,当运行到notify函数的时候,就会唤醒wait函数,然后自动lock并继续运行。

当然wait函数还有第二个参数,这个参数接收一个布尔类型的值,当这个布尔类型的值为false的时候线程就会被阻塞在这里,只有当该线程被唤醒之后,且第二参数为true才会往下运行。

notify_one函数每次只能唤醒一个线程,那么notify_all函数的作用就是可以唤醒所有的线程,但是最终能抢夺锁的只有一个线程,或者说有多个线程在wait,但是用notify_one去唤醒其中一个线程,那么这些线程就出现了去争夺互斥量的一个情况,那么最终没有获得锁的控制权的线程就会再次回到阻塞的状态,那么对于这些没有抢到控制权的这个过程就叫做虚假唤醒。那么对于虚假唤醒的解决方法就是加一个while循环。

#include <mutex>
#include <cstdlib>
std::mutex mtx;
std::condition_variable cv;
std::queue<int> que;

void consumer()
{
    while (true)
    {
        std::unique_lock<std::mutex> lck(mtx);
        // while (que.size() == 0) //当队列为空的时候,需要等待
        // {
        //     cv.wait(lck);
        // }
        /*上述写法这样也可以。当队列不为空且线程被唤醒才可以继续执行。
        cv.wait(lck, []()
                { return que.size() != 0; });
        */
        int temp = que.front();
        std::cout << "read the first element:" << temp << " from the queue" << endl;
        que.pop();
    }
}

void producer()
{
    srand((int)time(0)); //随机种子
    while (true)
    {
        {
            std::unique_lock<std::mutex> lck(mtx);
            int temp = rand() % 100;
            que.push(temp);
            std::cout << "write an element:" << temp << " to the queue" << endl;
        }//注意这个作用域,为了就是unlock,不可以省。
        //然后再去唤醒。
        cv.notify_all();
    }
}

int main(int, char **)
{
    std::cout << "========boot=======" << endl;
    thread th1(producer);
    thread th2(consumer);
    th1.join();
    th2.join();
    std::cout << "========over=======" << endl;
}

4 信号量(Semaphore)

定义于头文件<semaphore.h>,信号量是一种轻量的同步机制,用于制约对共享资源的并发访问(控制线程的并发数量)。在可以使用两者时,信号量能比条件变量更有效率。

// 初始化
#include<semaphore.h>
int sem_init(sem_t *sem,int pshared,unsigned int value);
/*
功能 创建一个信号量并初始化它的值,一个无名信号量在被使用前必须初始化
参数 sem 信号量地址  
     pshared  等于0 信号量在线程间共享   不等于0 信号在进程间共享
     value 信号量的初始值
返回 成功 0    失败 -1

*/


// 销毁
#include<semaphore,h>
int sem_destroy(sem_t *sem);
/*
功能 删除sem标识的信号量
参数 sem 信号量地址
返回 成功 0    失败 -1
*/


// P操作(减1)
#include<semaphore.h>
int sem_wait(sem_t *sem);
/*
功能 将信号量的值减1,操作前,先检查信号量(sem)的值是否为0,若为0,则阻塞,直到信号量大于0再减
参数 sem 信号量地址
返回 成功 0   失败 -1
*/

// 非阻塞减1
int sem_trywait(sem_t *sem);
// 以非阻塞的方式来对信号量进行减1操作
// 若操作前,信号量的值等于0,则对信号量的操作失败,函数立即返回

// 限时减1
int sem_timedwait(sem_t *sem,const struct timespec *abs_timeout);
// 限时尝试将信号量的值减1
// abs_timeout 绝对时间


// V操作(加1)
#include<semaphore.h>
int sem_post(sem_t *sem);
/*
功能 将信号量的值加1,并发出信号唤醒等待线程(sem_wait());
参数 sem 信号量地址
返回 成功 0    失败 -1
*/


// 获取信号量的值
#include<semaphore.h>
int sem_getvalue(sem_t *sem,int *val);
/*
功能 获取sem标识的信号量的值,保存在val中
参数 sem 信号量地址   val 保存信号量值的地址
返回 成功 0     失败 -1
*/
//在两个函数中按顺序打印奇数偶数
#include <semaphore.h>
sem_t oddSem;
sem_t evenSem;

void getOdd()
{
    for (size_t i = 0; i < 100; i++)
    {
        if (i % 2 != 0)
        {
            sem_wait(&oddSem);//-1 操作 如果操作前为0则阻塞。
            std::cout << "threadID:" << this_thread::get_id() << " getOdd:" << i << endl;
            sem_post(&evenSem);//+1操作 通知even打印。
        }
    }
}

void getEven()
{
    for (size_t i = 0; i < 100; i++)
    {
        if (i % 2 == 0)
        {
            sem_wait(&evenSem);//-1操作,如果操作前为0则阻塞。
            std::cout << "threadID:" << this_thread::get_id() << " getEven:" << i << endl;
            sem_post(&oddSem);//+1操作,通知odd打印
        }
    }
}

int main(int, char **)
{
    std::cout << "========boot=======" << endl;

    sem_init(&oddSem,0,0);
    sem_init(&evenSem,0,1);//从0开始打印,所以初始值为1.

    thread th1(getOdd);
    thread th2(getEven);
    th1.join();
    th2.join();
    std::cout << "========over=======" << endl;
}

5 事件

HANDLE event = NULL;

void m_raise()
{
    for (size_t i = 0; i < 10; i++)
    {
        /* code */
        std::cout << "threadID:" << this_thread::get_id() <<__FUNCTION__<< i << endl;
        if(i==5)
        {
            //激活事件。
            SetEvent(event);
        }
    }
}

void receive()
{
    //如果事件为激活状态则直接执行。
    //否则阻塞直到事件被激活。
    WaitForSingleObject(event,INFINITE);
    for (size_t i = 0; i < 10; i++)
    {
        /* code */
        std::cout << "threadID:" << this_thread::get_id() <<__FUNCTION__<< i << endl;
    }
}

int main(int, char **)
{
    std::cout << "========boot=======" << endl;
    //初始化事件
    event = CreateEvent(NULL, FALSE, TRUE, NULL);  
	ResetEvent(event);//设置事件状态为未激活状态。

    thread th1(receive);
    thread th2(m_raise);
    th1.join();
    th2.join();
    std::cout << "========over=======" << endl;
}
THE END

发表回复