forked from AllenDowney/ThinkOS
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathqueue_mutex.c
More file actions
191 lines (158 loc) · 3.61 KB
/
queue_mutex.c
File metadata and controls
191 lines (158 loc) · 3.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
/* Example code for Think OS.
Copyright 2015 Allen Downey
License: Creative Commons Attribution-ShareAlike 3.0
*/
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <pthread.h>
#include "utils.h"
#define NUM_CHILDREN 2
#define QUEUE_LENGTH 16
// QUEUE
typedef struct {
int *array;
int length;
int next_in;
int next_out;
Mutex *mutex;
} Queue;
Queue *make_queue(int length)
{
Queue *queue = (Queue *) malloc(sizeof(Queue));
queue->length = length;
queue->array = (int *) malloc(length * sizeof(int));
queue->next_in = 0;
queue->next_out = 0;
queue->mutex = make_mutex();
return queue;
}
int queue_incr(Queue *queue, int i)
{
// NOTE: you must hold the mutex to call this function.
return (i+1) % queue->length;
}
int queue_empty(Queue *queue)
{
// NOTE: you must hold the mutex to call this function.
// queue is empty if next_in and next_out are the same
int res = (queue->next_in == queue->next_out);
return res;
}
int queue_full(Queue *queue)
{
// NOTE: you must hold the mutex to call this function.
// queue is full if incrementing next_in lands on next_out
int res = (queue_incr(queue, queue->next_in) == queue->next_out);
return res;
}
void queue_push(Queue *queue, int item) {
mutex_lock(queue->mutex);
if (queue_full(queue)) {
mutex_unlock(queue->mutex);
perror_exit("queue is full");
}
queue->array[queue->next_in] = item;
queue->next_in = queue_incr(queue, queue->next_in);
mutex_unlock(queue->mutex);
}
int queue_pop(Queue *queue) {
mutex_lock(queue->mutex);
if (queue_empty(queue)) {
mutex_unlock(queue->mutex);
perror_exit("queue is empty");
}
int item = queue->array[queue->next_out];
queue->next_out = queue_incr(queue, queue->next_out);
mutex_unlock(queue->mutex);
return item;
}
// SHARED
typedef struct {
Queue *queue;
} Shared;
Shared *make_shared()
{
Shared *shared = check_malloc(sizeof(Shared));
shared->queue = make_queue(QUEUE_LENGTH);
return shared;
}
// THREAD
pthread_t make_thread(void *(*entry)(void *), Shared *shared)
{
int ret;
pthread_t thread;
ret = pthread_create(&thread, NULL, entry, (void *) shared);
if (ret != 0) {
perror_exit("pthread_create failed");
}
return thread;
}
void join_thread(pthread_t thread)
{
int ret = pthread_join(thread, NULL);
if (ret == -1) {
perror_exit("pthread_join failed");
}
}
// PRODUCER-CONSUMER
void *producer_entry(void *arg)
{
int i;
Shared *shared = (Shared *) arg;
for (i=0; i<QUEUE_LENGTH; i++) {
printf("adding item %d\n", i);
queue_push(shared->queue, i);
}
pthread_exit(NULL);
}
void *consumer_entry(void *arg)
{
int i;
int item;
Shared *shared = (Shared *) arg;
for (i=0; i<QUEUE_LENGTH; i++) {
item = queue_pop(shared->queue);
printf("consuming item %d\n", item);
}
pthread_exit(NULL);
}
// TEST CODE
void queue_test()
{
int i;
int item;
int length = 128;
Queue *queue = make_queue(length);
assert(queue_empty(queue));
for (i=0; i<length-1; i++) {
queue_push(queue, i);
}
assert(queue_full(queue));
for (i=0; i<10; i++) {
item = queue_pop(queue);
assert(i == item);
}
assert(!queue_empty(queue));
assert(!queue_full(queue));
for (i=0; i<10; i++) {
queue_push(queue, i);
}
assert(queue_full(queue));
for (i=0; i<10; i++) {
item = queue_pop(queue);
}
assert(item == 19);
}
int main()
{
int i;
pthread_t child[NUM_CHILDREN];
Shared *shared = make_shared();
child[0] = make_thread(producer_entry, shared);
child[1] = make_thread(consumer_entry, shared);
for (i=0; i<NUM_CHILDREN; i++) {
join_thread(child[i]);
}
return 0;
}