线程间的同步方法大体可以分为两类:用户模式和内核模式。内核模式就是指利用系统内核对象的单一性来进行同步,使用时需要切换内核态与用户态,而用户模式就是不需要切换到内核态,只在用户态完成操作。
**临界区:**通过对多线程的串行化来访问公共资源或一段代码、速度快,适合控制数据访问。
**互斥量:**为协调共同对一个共享资源的单独访问而设计的。
**信号量:**为控制一个具有有限数量用户资源而设计的。
**事件:**用来通知线程有一些时间已发生,从而启动后继任务的开始。
atomic<int> num{0};
int main(int, char **)
{
std::cout << "========boot=======" << endl;
//检查是否无锁的
std::cout << "num.is_lock_free():" << num.is_lock_free() << endl;
num.store(10); //存储值
std::cout << "num:" << num.load() << endl; //读取值
int a = num.exchange(100); //交换值,返回原来的值
std::cout << "num:" << num.load() << endl;
std::cout << "========over=======" << endl;
}
atomic并不能保证类型T是无锁的,另外不同平台的处理器处理方式不同,也不能保证必定无锁,所以该类型都会有is_lock_free() 函数来判断是否无锁。
有一个比较特殊的原子类型是atomic_flag,因为atomic_flag与其他原子类型不同,它是无锁的,即线程对其访问不需要加锁,而其他的原子类型不一定是无锁的。
atomic_flag flag = ATOMIC_FLAG_INIT; //初始化
int main(int, char **)
{
std::cout << "========boot=======" << endl;
//之前还未设置标志,所以调用的时候返回false,然后设置了标志
std::cout << "ret:" << flag.test_and_set() << endl;
//再次调用的时候已经设置过标志,所以返回true
std::cout << "ret:" << flag.test_and_set() << endl;
flag.clear(); //清除标志
//清除标志后,返回值为false,并且设置了标志
std::cout << "ret:" << flag.test_and_set() << endl;
//设置标志后,返回值为true
std::cout << "ret:" << flag.test_and_set() << endl;
std::cout << "========over=======" << endl;
}
在Linux平台下,没有临界区的概念。
#include <windows.h>
CRITICAL_SECTION cs; //定义临界区对象
void foo()
{
EnterCriticalSection(&cs);
/* code 公共资源代码*/
LeaveCriticalSection(&cs);
}
int main(int, char **)
{
std::cout << "========boot=======" << endl;
InitializeCriticalSection(&cs); //初始化临界区
thread th(foo);
thread th(foo);
InitializeCriticalSection(&cs); //初始化临界区
std::cout << "========over=======" << endl;
}
在C++11中被命名为Mutex,所有其相关的类和函数都在头文件mutex中。一共有四种互斥元类,分别是:
上述四种互斥量元类都有一个成员函数lock和unlock来实现锁定与解锁的操作。
除此之外,还有两种lock类,分别是:
std::mutex
不允许拷贝构造,也不允许 move 拷贝,最初产生的 mutex 对象是处于 unlocked 状态的。unlock()
, 解锁,释放对互斥量的所有权。void foo()
{
if(mtx.try_lock())
{
/*公共资源code*/
mtx.unlock();
}
}
#include <mutex>
std::mutex mtx;
int main(int, char **)
{
std::cout << "========boot=======" << endl;
thread th1(foo);
thread th2(foo);
th1.join();
th2.join();
std::cout << "========over=======" << endl;
}
#include <mutex>
std::mutex mtx;
void foo()
{
//使用mutex对象定义一个局部的lock_guard对象
std::lock_guard<std::mutex> lock(mtx);
/*公共资源code*/
//只有等到该lock_guard对象销毁后才能解锁
}
int main(int, char **)
{
std::cout << "========boot=======" << endl;
thread th1(foo);
thread th2(foo);
th1.join();
th2.join();
std::cout << "========over=======" << endl;
}
lock_guard本身并没有提供枷锁和解锁的接口,智能保证再析构的时候执行解锁操作,不够灵活。
但是unique_lock提供了lock()和unlock()接口,能记录现在处于上锁还是解锁状态,在析构的时候,会根据当前状态来决定是否需要解锁。然而这是有代价的,因为它内部需要维护锁的状态,所以效率要比lock_guard低一点,在lock_guard能解决问题的时候,就用lock_guard,反之使用unique_lock。
#include <mutex>
std::mutex mtx;
void foo()
{
//使用mutex对象定义一个局部的lock_guard对象
std::unique_lock<std::mutex> lock(mtx);
/*do something1 code*/
lock.unlock();
/*do others */
lock.lock();
/*do something2 code*/
}
int main(int, char **)
{
std::cout << "========boot=======" << endl;
thread th1(foo);
thread th2(foo);
th1.join();
th2.join();
std::cout << "========over=======" << endl;
}
condition_variable是一个类,搭配互斥量mutex来用,这个类主要有wait函数和notify函数。程序运行到wait函数的时候会先在此阻塞,然后自动unlock,那么其他线程在拿到锁以后就会往下运行,当运行到notify函数的时候,就会唤醒wait函数,然后自动lock并继续运行。
当然wait函数还有第二个参数,这个参数接收一个布尔类型的值,当这个布尔类型的值为false的时候线程就会被阻塞在这里,只有当该线程被唤醒之后,且第二参数为true才会往下运行。
notify_one函数每次只能唤醒一个线程,那么notify_all函数的作用就是可以唤醒所有的线程,但是最终能抢夺锁的只有一个线程,或者说有多个线程在wait,但是用notify_one去唤醒其中一个线程,那么这些线程就出现了去争夺互斥量的一个情况,那么最终没有获得锁的控制权的线程就会再次回到阻塞的状态,那么对于这些没有抢到控制权的这个过程就叫做虚假唤醒。那么对于虚假唤醒的解决方法就是加一个while循环。
#include <mutex>
#include <cstdlib>
std::mutex mtx;
std::condition_variable cv;
std::queue<int> que;
void consumer()
{
while (true)
{
std::unique_lock<std::mutex> lck(mtx);
// while (que.size() == 0) //当队列为空的时候,需要等待
// {
// cv.wait(lck);
// }
/*上述写法这样也可以。当队列不为空且线程被唤醒才可以继续执行。
cv.wait(lck, []()
{ return que.size() != 0; });
*/
int temp = que.front();
std::cout << "read the first element:" << temp << " from the queue" << endl;
que.pop();
}
}
void producer()
{
srand((int)time(0)); //随机种子
while (true)
{
{
std::unique_lock<std::mutex> lck(mtx);
int temp = rand() % 100;
que.push(temp);
std::cout << "write an element:" << temp << " to the queue" << endl;
}//注意这个作用域,为了就是unlock,不可以省。
//然后再去唤醒。
cv.notify_all();
}
}
int main(int, char **)
{
std::cout << "========boot=======" << endl;
thread th1(producer);
thread th2(consumer);
th1.join();
th2.join();
std::cout << "========over=======" << endl;
}
定义于头文件<semaphore.h>,信号量是一种轻量的同步机制,用于制约对共享资源的并发访问(控制线程的并发数量)。在可以使用两者时,信号量能比条件变量更有效率。
// 初始化
#include<semaphore.h>
int sem_init(sem_t *sem,int pshared,unsigned int value);
/*
功能 创建一个信号量并初始化它的值,一个无名信号量在被使用前必须初始化
参数 sem 信号量地址
pshared 等于0 信号量在线程间共享 不等于0 信号在进程间共享
value 信号量的初始值
返回 成功 0 失败 -1
*/
// 销毁
#include<semaphore,h>
int sem_destroy(sem_t *sem);
/*
功能 删除sem标识的信号量
参数 sem 信号量地址
返回 成功 0 失败 -1
*/
// P操作(减1)
#include<semaphore.h>
int sem_wait(sem_t *sem);
/*
功能 将信号量的值减1,操作前,先检查信号量(sem)的值是否为0,若为0,则阻塞,直到信号量大于0再减
参数 sem 信号量地址
返回 成功 0 失败 -1
*/
// 非阻塞减1
int sem_trywait(sem_t *sem);
// 以非阻塞的方式来对信号量进行减1操作
// 若操作前,信号量的值等于0,则对信号量的操作失败,函数立即返回
// 限时减1
int sem_timedwait(sem_t *sem,const struct timespec *abs_timeout);
// 限时尝试将信号量的值减1
// abs_timeout 绝对时间
// V操作(加1)
#include<semaphore.h>
int sem_post(sem_t *sem);
/*
功能 将信号量的值加1,并发出信号唤醒等待线程(sem_wait());
参数 sem 信号量地址
返回 成功 0 失败 -1
*/
// 获取信号量的值
#include<semaphore.h>
int sem_getvalue(sem_t *sem,int *val);
/*
功能 获取sem标识的信号量的值,保存在val中
参数 sem 信号量地址 val 保存信号量值的地址
返回 成功 0 失败 -1
*/
//在两个函数中按顺序打印奇数偶数
#include <semaphore.h>
sem_t oddSem;
sem_t evenSem;
void getOdd()
{
for (size_t i = 0; i < 100; i++)
{
if (i % 2 != 0)
{
sem_wait(&oddSem);//-1 操作 如果操作前为0则阻塞。
std::cout << "threadID:" << this_thread::get_id() << " getOdd:" << i << endl;
sem_post(&evenSem);//+1操作 通知even打印。
}
}
}
void getEven()
{
for (size_t i = 0; i < 100; i++)
{
if (i % 2 == 0)
{
sem_wait(&evenSem);//-1操作,如果操作前为0则阻塞。
std::cout << "threadID:" << this_thread::get_id() << " getEven:" << i << endl;
sem_post(&oddSem);//+1操作,通知odd打印
}
}
}
int main(int, char **)
{
std::cout << "========boot=======" << endl;
sem_init(&oddSem,0,0);
sem_init(&evenSem,0,1);//从0开始打印,所以初始值为1.
thread th1(getOdd);
thread th2(getEven);
th1.join();
th2.join();
std::cout << "========over=======" << endl;
}
HANDLE event = NULL;
void m_raise()
{
for (size_t i = 0; i < 10; i++)
{
/* code */
std::cout << "threadID:" << this_thread::get_id() <<__FUNCTION__<< i << endl;
if(i==5)
{
//激活事件。
SetEvent(event);
}
}
}
void receive()
{
//如果事件为激活状态则直接执行。
//否则阻塞直到事件被激活。
WaitForSingleObject(event,INFINITE);
for (size_t i = 0; i < 10; i++)
{
/* code */
std::cout << "threadID:" << this_thread::get_id() <<__FUNCTION__<< i << endl;
}
}
int main(int, char **)
{
std::cout << "========boot=======" << endl;
//初始化事件
event = CreateEvent(NULL, FALSE, TRUE, NULL);
ResetEvent(event);//设置事件状态为未激活状态。
thread th1(receive);
thread th2(m_raise);
th1.join();
th2.join();
std::cout << "========over=======" << endl;
}
上一篇
下一篇