线程同步
同步即协同步调,多个线程对公共区域数据按序访问,防止数据混乱,产生与时间有关的错误。
线程同步,指一个线程发出某一功能调用时,在没有得到结果之前,该调用不返回。同时其他进程为保证数据一致性,不能调用该功能。
因此,当有多个控制流共同操作一个共享资源时,就需要同步
数据混乱原因
- 资源共享(独享资源则不会)
- 调度随机(意味着数据访问会出现竞争)
- 线程间缺乏必要的同步机制
上面这三点,前两点无法改变(线程调度算法由cpu决定,无法改变),因此要解决数据混乱问题只能从第三点入手,使得多个线程在访问共享资源的时候,出现互斥
建议锁
对公共数据进行保护,但建议锁本身不具备强制性,只能通过代码逻辑来加以限制。所有线程应该在访问公共数据前先拿锁再访问,但如果有一个线程的代码不遵守此流程,直接去访问公共数据,结果就是数据也能访问到(因为像全局变量这种公共数据是在所有线程间共享的),但可能会导致数据混乱
互斥量mutex
Linux中提供一把互斥锁mutex(也称之为互斥量)
每个线程在对资源操作前都尝试先加锁,成功加锁才能操作,操作结束解锁,同一时刻只能有一个线程持有该锁
资源还是共享的,线程间也还是竞争的,但通过一把锁就将资源的访问变成了互斥操作,避免产生与时间有关的错误
使用场景:一个共享资源,多个线程访问
使用mutex一般步骤:
pthread_mutex_t
类型本质是一个结构体,可将其简单看成整数,初始化后值为1
pthread_mutex_t mutex;
创建互斥锁(变量mutex
只有1、0两种取值)pthread_mutex_init(&mutex, NULL);
初始化为1pthread_mutex_lock(&mutex);
加锁做一次–操作,1→>0,阻塞线程- 访问共享数据(
stdout
) pthrad_mutext_unlock(&mutex);
解锁做一次++操作,0→1,唤醒阻塞在锁上的线程pthead_mutex_destroy(&mutex);
销毁互斥锁
注意事项:尽量保证锁的粒度越小越好(访问共享数据前加锁,访问结束后立即解锁)
trylock:尝试加锁,成功1→0,失败则返回,同时设置错误号EBUSY
初始化互斥量:pthread_mutex_t mutex;
- 动态初始化:
pthread_mutex_init(&mutex, NULL);
- 静态初始化:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
死锁
死锁不是一把锁,而是使用锁不恰当导致的现象
产生原因:
- 对一把锁反复lock
- 两个线程,各自持有一把锁,请求另一把
避免方法:
- 保证资源的获取顺序,要求每个线程获取资源的顺序一致
- 当得不到所有所需资源时,主动放弃已经获得的资源,等待
restrict关键字:用来限定指针变量,被该关键字限定的指针变量所指向的内存操作,必须由本指针完成
读写锁
锁只有一把,以读方式给数据加锁→读锁,以写方式给数据加锁→写锁
读共享,写独占,也叫共享独占锁
写锁优先级高
使用场景:适合于对数据结构读的次数远大于写
特性:
- 读写锁是“写模式加锁“时,解锁前,所有对该锁加锁的线程都会被阻塞
- 读写锁是“读模式加锁”时,如果只有以读模式的线程对其加锁会成功,如果线程以写模式加锁会阻塞
- 既有试图以写模式加锁的线程,也有试图以读模式加锁的线程,那么读写锁会阻塞随后的读模式锁请求,优先满足写模式锁。读锁、写锁并行阻塞时,写锁优先级高
相较于互斥量而言,当读线程多的时候,读写锁访问效率高
函数原型:
pthread_rwlock_t rwlock;
pthread_rwlock_init(&rwlock, NULL);
pthread_rwlock_rdlock(&rwlock); try
pthread_rwlock_wrlock(&rwlock); try
pthread_rwlock_unlock(&rwlock);
pthread_rwlock_destroy(&rwlock);
条件变量
本身不是锁 通常与互斥锁mutex配合使用
pthread_cond_t cond;
初始化条件变量:
- 动态初始化:
pthread_cond_init(&cond, NULL);
- 静态初始化:
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
条件变量函数
pthread_cond_wait(&cond, &mutex); 阻塞等待一个条件变量满足
作用:
- 阻塞等待条件变量满足
- 解锁已经加锁成功的信号量,相当于调用pthread_mutex_unlock(&mutex),且1.2.两步为一个原子操作
- 当条件满足,函数返回时,解除阻塞,重新加锁信号量,相当于调用pthread_mutex_lock(&mutex);
以消费者举例,先加互斥锁,如果没有产品则阻塞等待,在等待过程中,互斥锁已解(不然生产者怎么加锁去生产),当条件变量满足,即有产品了,函数返回,消费者重新加锁,然后去消费产品,最后解锁
int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime); 计时阻塞等待一个条件变量满足,时间一到则放弃等待
pthread_cond_signal():唤醒阻塞在条件变量上的(至少)一个线程
pthread_cond_broadcast():唤醒阻塞在条件变量上的所有线程(惊群效应 谨慎使用)
生成者消费者
编程练习:借助条件变量完成线程间同步,实现生成者消费者模型
//借助条件变量实现生产者消费者模型
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<fcntl.h>
#include<unistd.h>
#include<pthread.h>
#include<sys/stat.h>
#include<sys/mman.h>
#include<dirent.h>
#include<errno.h>
#include<signal.h>
//链表作为共享数据,需要被互斥锁保护
struct msg {
struct msg *next;
int num;
};
struct msg *head;
//静态初始化一个条件变量和一个互斥锁
pthread_cond_t has_product = PTHREAD_COND_INITIALIZER;
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
void *producer(void *p) {
struct msg *mp;
while(1) {
mp = malloc(sizeof(struct msg));
mp->num = rand() % 1000 + 1; //模拟生产一个产品
printf("-Produce-----------%d\n", mp->num);
pthread_mutex_lock(&lock);
mp->next = head;
head = mp;
pthread_mutex_unlock(&lock);
pthread_cond_signal(&has_product); //将等待在该条件变量上的一个线程唤醒
sleep(rand() % 5);
}
}
void *consumer(void *p) {
struct msg *mp;
while(1) {
pthread_mutex_lock(&lock);
//在这里while可以换成if,因为只有一个生产者一个消费者,但若有多个消费者就必须用while
//因为可能有其它消费者先拿到锁,吃了产品,所以要循环判断当前是否有产品
//多个消费者模型实现也很简单,在main函数里多create几个消费者线程即可
while(head == NULL) {
pthread_cond_wait(&has_product, &lock);
}
mp = head;
head = mp->next;
pthread_mutex_unlock(&lock);
printf("-Consume %lu---%d\n", pthread_self(), mp->num);
free(mp);
sleep(rand() % 5);
}
}
int main(int argc, char *argv[]) {
pthread_t pdid, csid;
srand(time(NULL));
pthread_create(&pdid, NULL, producer, NULL);
pthread_create(&csid, NULL, consumer, NULL);
pthread_join(pdid, NULL);
pthread_join(csid, NULL);
return 0;
}
信号量
互斥量虽然保证了多线程操作时的数据正确性,但每次只能一个线程访问共享数据,线程从并行执行变成了串行执行,导致线程的并发性大大下降
信号量是相对折中的一种处理方式,既能保证同步,数据不混乱,又能提高线程并发量(信号量和信号毫无关联)
信号量相当于初始化值为N的互斥量(N表示可以同时访问共享数据区的线程数),应用于线程、进程间同步
相关函数:
sem_t sem;
定义信号量
int sem_init(sem_t *sem, int pshared, unsigned int value);
初始化
参数:
- sem:信号量
- pshared:
- 0:用于线程间同步
- 1:用于进程间同步
- value:N值(指定同时访问的线程数)
sem_wait();
一次调用,做一次- -操作, 当信号量的值为 0 时,再次- -就会阻塞(相当于pthread_mutex_lock
)sem_post();
一次调用,做一次++操作. 当信号量的值为 N 时, 再次 ++ 就会阻塞(相当于pthread_mutex_unlock
)
sem_destroy();
销毁信号量
生成者消费者
编程练习:借助信号量完成线程间同步,实现生产者消费者模型
//信号量实现生产者消费者问题
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<fcntl.h>
#include<unistd.h>
#include<pthread.h>
#include<sys/stat.h>
#include<sys/mman.h>
#include<dirent.h>
#include<errno.h>
#include<signal.h>
#include<semaphore.h>
#define NUM 5
int queue[NUM]; //全局数组实现环形队列
sem_t blank_num, product_num; //空格子信号量 产品信号量
void *producer(void *arg) {
int i = 0;
while(1) {
sem_wait(&blank_num); //生产者将空格数-- 为0则阻塞等待
queue[i] = rand() % 1000 + 1; //模拟生产一个产品
printf("---Produce---%d\n", queue[i]);
sem_post(&product_num); //将产品数++
i = (i+1) % NUM; //借助下标实现环形
sleep(rand() % 1);
}
}
void *consumer(void *arg) {
int i = 0;
while(1) {
sem_wait(&product_num); //消费者将产品数-- 为0则阻塞等待
printf("-Consume-%d\n", queue[i]);
queue[i] = 0; //模拟消费一个产品
sem_post(&blank_num); //空格数++
i = (i+1) % NUM;
sleep(rand() % 3);
}
}
int main(int argc, char *argv[]) {
pthread_t pdid, csid;
sem_init(&blank_num, 0, NUM); //参2为0表示线程间共享 参3表示初始化空格子信号量为5
sem_init(&product_num, 0, 0); //产品数为0
pthread_create(&pdid, NULL, producer, NULL);
pthread_create(&csid, NULL, consumer, NULL);
pthread_join(pdid, NULL);
pthread_join(csid, NULL);
sem_destroy(&blank_num);
sem_destroy(&product_num);
return 0;
}