0%

unix环境高级编程 - 线程同步

最近在学习 APUE,所以顺便将每日所学记录下来,一方面为了巩固学习的知识,另一方面也为同样在学习APUE的童鞋们提供一份参考。

本系列博文均根据学习《UNIX环境高级编程》一书总结而来;

运行环境:

  • 操作系统: ubutnu 16.04
  • 编译器:QtCreator CLion 2020.3

线程同步

概念

线程同步,指一个线程发出某一功能调用时,在没有得到结果之前,该调用不返回。同时其它线程为保证数据一致性,不能调用该功能。

互斥锁

pthread_mutex_t

  • 其本质是一个结构体,为简化理解,应用时可忽略其实现细节,简单当成整数看待。

  • pthread_mutex_t mutex; 变量mutex只有两种取值1、0。

pthread_mutex_init

初始化一个互斥锁(互斥量) —> 初值可看作1

1
2
3
int pthread_mutex_init(pthread_mutex_t *restrict mutex, 
const pthread_mutexattr_t *restrict attr);

  • mutex:传出参数,调用时应传 &mutex

  • attr:互斥锁属性。是一个传入参数,通常传NULL,选用默认属性(线程间共享)。

restrict**关键字**:只用于限制指针,告诉编译器,所有修改该指针指向内存中内容的操作,只能通过本指针完成。不能通过除本指针以外的其他变量或指针修改互斥量mutex的两种初始化方式:

  • 静态初始化:如果互斥锁 mutex 是静态分配的(定义在全局,或加了static关键字修饰),可以直接使用宏进行初始化。

pthead_mutex_t muetx = PTHREAD_MUTEX_INITIALIZER;

  • 动态初始化:局部变量应采用动态初始化。

pthread_mutex_init(&mutex, NULL)

pthread_destory

销毁一个互斥锁

1
int pthread_mutex_destroy(pthread_mutex_t *mutex);

pthread_mutex_lock

对互斥所加锁,可理解为将mutex–

1
int pthread_mutex_lock(pthread_mutex_t *mutex);

pthread_mutex_unlock

对互斥所解锁,可理解为将mutex ++

1
int pthread_mutex_unlock(pthread_mutex_t *mutex);

pthread_mutex_trylock

尝试加锁

1
int pthread_mutex_trylock(pthread_mutex_t *mutex);

demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/**
* @file mutex使用
*
* 示例程序 - 02_pthread_mutex.c
*
* @author Steve & sYstemk1t
*
*/


#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
#include <signal.h>
#include <sys/time.h>
#include <time.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <signal.h>
#include <pthread.h>
#define NUM 20000
int number = 0;

//定义锁
pthread_mutex_t mutex;

//线程处理函数
void* ThreadProc1(void *arg)
{
int nCount = 0;
for (int i = 0; i < NUM; ++i) {
pthread_mutex_lock(&mutex);
nCount = number;
nCount++;
number = nCount;
printf("thread1 number ==%d\n",number);
pthread_mutex_unlock(&mutex);
}

}

void* ThreadProc2(void *arg)
{
int nCount = 0;
for (int i = 0; i < NUM; ++i) {
pthread_mutex_lock(&mutex);
nCount = number;
nCount++;
number = nCount;
printf("thread2 number ==%d\n",number);
pthread_mutex_unlock(&mutex);
}

}

int main()
{
//互斥锁初始化
pthread_mutex_init(&mutex,NULL);
pthread_t thread[2];

int nRet = 0;
nRet = pthread_create(&thread[0],NULL,ThreadProc1,NULL);
if(nRet != 0)
{
printf("pthread_create[0] error, [%s]\n",strerror(nRet));
return -1;
}

nRet = pthread_create(&thread[1],NULL,ThreadProc2,NULL);
if(nRet != 0)
{
printf("pthread_create[1] error, [%s]\n",strerror(nRet));
return -1;
}
printf("child Thread, pid == [%d] id == [%ld]\n",getpid(),pthread_self());
pthread_join(thread[0],NULL);
pthread_join(thread[1],NULL);
//释放互斥锁
pthread_mutex_destroy(&mutex);
return 0;
}

死锁

读写锁

初始化读写锁

pthread_rwlock_init 语法

读取读写锁中的锁

pthread_rwlock_rdlock 语法

读取非阻塞读写锁中的锁

pthread_rwlock_tryrdlock 语法

写入读写锁中的锁

pthread_rwlock_wrlock 语法

写入非阻塞读写锁中的锁

pthread_rwlock_trywrlock 语法

解除锁定读写锁

pthread_rwlock_unlock 语法

销毁读写锁

pthread_rwlock_destroy 语法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
/**
* @file mutex使用
*
* 示例程序 - 02_pthread_mutex.c
*
* @author Steve & sYstemk1t
*
*/


#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
#include <signal.h>
#include <sys/time.h>
#include <time.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <signal.h>
#include <pthread.h>

int g_number = 0;
//定义读写锁
pthread_rwlock_t rwlock;
void *ThreadProc_Write(void *arg)
{
int nCount = 0;
int i = *(int *)arg;
while (1)
{
pthread_rwlock_wrlock(&rwlock); //加写锁
nCount = g_number;
nCount++;

g_number = nCount;
printf("[%d]-W:[%d]\n",i,nCount);
pthread_rwlock_unlock(&rwlock); //解锁
sleep(rand() % 3);
}
}

void *ThreadProc_Read(void *arg)
{
int i = *(int *)arg;
int nCount = 0;
while (1)
{
pthread_rwlock_rdlock(&rwlock);
nCount = g_number;
printf("[%d]-R:[%d]\n",i,nCount);
pthread_rwlock_unlock(&rwlock); //解锁
sleep(rand() % 3);
}
}

int main()
{
pthread_rwlock_init(&rwlock,NULL);
//创建子线程
pthread_t pthread[8];
int n = 8;
int nAddar[8];
//创建三个写线程
for (int i = 0; i < 3; ++i) {
nAddar[i] = i;
pthread_create(&pthread[i],NULL,ThreadProc_Write,&nAddar[i]);
}
//创建五个读线程
for (int i = 3; i < 8; ++i) {
nAddar[i] = i;
pthread_create(&pthread[i],NULL,ThreadProc_Read,&nAddar[i]);
}
//回收资源
for (int i = 0; i < 8; ++i) {
pthread_join(pthread[i],NULL);
}
pthread_rwlock_destroy(&rwlock);
return 0;
}

条件变量

  • 条件本身不是锁!但它也可以造成线程阻塞。通常与互斥锁配合使用。给多线程提供一个会合的场所。
    • 使用互斥量保护共享数据;
    • 使用条件变量可以使线程阻塞, 等待某个条件的发生, 当条件满足的时候解除阻塞.

条件本身不是锁!但它也可以造成线程阻塞。通常与互斥锁配合使用。给多线程提供一个会合的场所。

pthread_cond_t

定义一个条件变量

pthread_cond_init

初始化条件变量

1
int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr);

pthread_cond_distroy

销毁条件变量

pthread_cond_wait

等待条件变量

1
2
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime);

pthread_cond_signal()

发送一个信号给另外一个正在处于阻塞等待状态的线程,使其脱离阻塞状态,继续执行;

1
int pthread_cond_signal(pthread_cond_t *cond);

pthread_cond_broadcast

激发所有的等待线程,使其全部开始执行;

1
int pthread_cond_broadcast(pthread_cond_t *cond);

demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>
typedef struct node
{
int data;
struct node *next;
}NODE;

NODE *head = NULL;

//定义一把锁
pthread_mutex_t mutex;

//定义条件变量
pthread_cond_t cond;

//生产者线程
void *producer(void *arg)
{
NODE *pNode = NULL;
while(1)
{
//生产一个节点
pNode = (NODE *)malloc(sizeof(NODE));
if(pNode==NULL)
{
perror("malloc error");
exit(-1);
}
pNode->data = rand()%1000;
printf("P:[%d]\n", pNode->data);

//加锁
pthread_mutex_lock(&mutex);

pNode->next = head;
head = pNode;

//解锁
pthread_mutex_unlock(&mutex);

//通知消费者线程解除阻塞
pthread_cond_signal(&cond);

sleep(rand()%3);
}
}


//消费者线程
void *consumer(void *arg)
{
NODE *pNode = NULL;
while(1)
{
//加锁
pthread_mutex_lock(&mutex);

if(head==NULL)
{
//若条件不满足,需要阻塞等待
//若条件不满足,则阻塞等待并解锁;
//若条件满足(被生成者线程调用pthread_cond_signal函数通知),解除阻塞并加锁
pthread_cond_wait(&cond, &mutex);
}

printf("C:[%d]\n", head->data);
pNode = head;
head = head->next;

//解锁
pthread_mutex_unlock(&mutex);

free(pNode);
pNode = NULL;

sleep(rand()%3);
}
}

int main()
{
int ret;
pthread_t thread1;
pthread_t thread2;

//初始化互斥锁
pthread_mutex_init(&mutex, NULL);

//条件变量初始化
pthread_cond_init(&cond, NULL);

//创建生产者线程
ret = pthread_create(&thread1, NULL, producer, NULL);
if(ret!=0)
{
printf("pthread_create error, [%s]\n", strerror(ret));
return -1;
}

//创建消费者线程
ret = pthread_create(&thread2, NULL, consumer, NULL);
if(ret!=0)
{
printf("pthread_create error, [%s]\n", strerror(ret));
return -1;
}

//等待线程结束
pthread_join(thread1, NULL);
pthread_join(thread2, NULL);

//释放互斥锁
pthread_mutex_destroy(&mutex);

//释放条件变量
pthread_cond_destroy(&cond);

return 0;
}

信号量

信号量相当于多把锁, 可以理解为是加强版的互斥锁;

sem_t

信号量变量

sem_init

初始化信号量;

1
int sem_init(sem_t *sem, int pshared, unsigned int value);	

sem: 信号量变量
pshared: 0表示线程同步, 1表示进程同步
value: 最多有几个线程操作共享数据

sem_wait

调用该函数一次, 相当于sem–, 当sem为0的时候, 引起阻塞

1
int sem_wait(sem_t *sem);

sem_post

调用一次, 相当于sem++
调用信号量

1
int sem_post(sem_t *sem);

sem_trywait

尝试加锁, 若失败直接返回, 不阻塞

sem_destroy

销毁信号量

demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
typedef struct node
{
int data;
struct node *next;
}NODE;

NODE *head = NULL;

//定义信号量
sem_t sem_producer;
sem_t sem_consumer;

//生产者线程
void *producer(void *arg)
{
NODE *pNode = NULL;
while(1)
{
//生产一个节点
pNode = (NODE *)malloc(sizeof(NODE));
if(pNode==NULL)
{
perror("malloc error");
exit(-1);
}
pNode->data = rand()%1000;
printf("P:[%d]\n", pNode->data);

//加锁
sem_wait(&sem_producer); //--

pNode->next = head;
head = pNode;

//解锁
sem_post(&sem_consumer); //相当于++

sleep(rand()%3);
}
}


//消费者线程
void *consumer(void *arg)
{
NODE *pNode = NULL;
while(1)
{
//加锁
sem_wait(&sem_consumer); //相当于--

printf("C:[%d]\n", head->data);
pNode = head;
head = head->next;

//解锁
sem_post(&sem_producer); //相当于++

free(pNode);
pNode = NULL;

sleep(rand()%3);
}
}

int main()
{
int ret;
pthread_t thread1;
pthread_t thread2;

//初始化信号量
sem_init(&sem_producer, 0, 5);
sem_init(&sem_consumer, 0, 0);

//创建生产者线程
ret = pthread_create(&thread1, NULL, producer, NULL);
if(ret!=0)
{
printf("pthread_create error, [%s]\n", strerror(ret));
return -1;
}

//创建消费者线程
ret = pthread_create(&thread2, NULL, consumer, NULL);
if(ret!=0)
{
printf("pthread_create error, [%s]\n", strerror(ret));
return -1;
}

//等待线程结束
pthread_join(thread1, NULL);
pthread_join(thread2, NULL);

//释放信号量资源
sem_destroy(&sem_producer);
sem_destroy(&sem_consumer);

return 0;
}