-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmadt.c
194 lines (182 loc) · 5.76 KB
/
madt.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <string.h>
#include "madt.h"
// Constructor
mbtree_int_node_t* new_mbtree_int_node(int value) {
mbtree_int_node_t *new_node = (mbtree_int_node_t *)malloc(sizeof(mbtree_int_node_t));
new_node->value = value;
new_node->left = NULL;
new_node->right = NULL;
return new_node;
}
// Destructor
void free_mbtree_int_node(mbtree_int_node_t *node) {
if (node != NULL) {
free(node);
}
}
void mbtree_int_inorder_traversal(mbtree_int_node_t *root) {
if (root == NULL) {
return;
} else {
mbtree_int_inorder_traversal(root->left);
printf("%d\n", root->value);
mbtree_int_inorder_traversal(root->left);
}
}
/*
* The number of dequeue operations on the queue that are done before
* performing a vacuum operation to reclaim memory.
*/
const uint64_t GC_COUNT = 100;
// Internal version of mqueue_size, which assumes that the mutex is
// already acquired.
uint64_t mqueue_size_int(mqueue_t *queue) {
return queue->rear+1 - queue->front;
}
uint64_t mqueue_size(mqueue_t *queue) {
#ifdef MIKELIBC_THREADS
mtx_lock(&(queue->mutex));
#endif
uint64_t size = mqueue_size_int(queue);
#ifdef MIKELIBC_THREADS
mtx_unlock(&(queue->mutex));
#endif
return size;
}
/*
* Methods for the mqueue_t.
*/
void mqueue_init(mqueue_t *queue,
uint64_t initial_size,
uint64_t max_size,
char *description)
{
mdbgf("%s: mqueue_init: initial_size %d and max_size %d\n",
description, initial_size, max_size);
if (initial_size < 1) {
initial_size = 1;
}
queue->front = 0;
queue->rear = -1;
queue->alloc_size = initial_size;
queue->max_size = max_size;
queue->gc_run = GC_COUNT;
bzero(queue->description, QUEUE_NAME_SIZE);
if (description != NULL) {
strncpy(queue->description, description, QUEUE_NAME_SIZE);
}
size_t data_size = queue->alloc_size * sizeof(void *);
mdbgf("%s: mallocing %d bytes of memory for the mqueue\n",
queue->description, data_size);
queue->data = malloc(data_size);
assert( queue->data != NULL );
#ifdef MIKELIBC_THREADS
mdbgf("%s: initializing thread primitives\n", queue->description);
mtx_init(&(queue->mutex), mtx_plain);
cnd_init(&(queue->full));
cnd_init(&(queue->empty));
#endif
}
void mqueue_destroy(mqueue_t *queue) {
free(queue->data);
#ifdef MIKELIBC_THREADS
mtx_destroy(&(queue->mutex));
cnd_destroy(&(queue->full));
cnd_destroy(&(queue->empty));
#endif
}
// Always enqueue to the end of the array, and grow if required.
uint64_t mqueue_enqueue(mqueue_t *queue, void *item) {
mdbgf("%s: enqueuing item\n", queue->description);
#ifdef MIKELIBC_THREADS
mdbgf("%s: locking mutex for enqueue\n", queue->description);
mtx_lock(&(queue->mutex));
mdbgf("%s: got the lock\n", queue->description);
if (queue->max_size > 0) {
while (mqueue_size_int(queue) == queue->max_size) {
mdbgf("%s: queue at max size of %d - waiting\n",
queue->description, queue->max_size);
cnd_wait(&(queue->full), &(queue->mutex));
}
#else
if (queue->max_size > 0) {
// Check for maximum size
if (mqueue_size_int(queue) == queue->max_size) {
return -1;
}
#endif
}
// Is there enough allocated space on the array?
queue->rear++;
if (queue->rear == queue->alloc_size) {
// Double the size.
size_t data_size = queue->alloc_size * sizeof(void *);
mdbgf("%s: realloc from %d to %d\n", queue->description, data_size, data_size*2);
queue->data = realloc(queue->data, data_size*2);
assert( queue->data != NULL );
queue->alloc_size *= 2;
}
queue->data[queue->rear] = item;
uint64_t size = mqueue_size_int(queue);
#ifdef MIKELIBC_THREADS
mdbgf("%s: signaling empty cond and releasing mutex\n", queue->description);
cnd_signal(&(queue->empty));
mtx_unlock(&(queue->mutex));
#endif
return size;
}
void *mqueue_dequeue(mqueue_t *queue) {
mdbgf("%s: dequeueing item\n", queue->description);
void *item;
#ifdef MIKELIBC_THREADS
mdbgf("%s: locking mutex for dequeue\n", queue->description);
mtx_lock(&(queue->mutex));
mdbgf("%s: got the lock\n", queue->description);
while (mqueue_size_int(queue) == 0) {
mdbgf("%s: is empty, waiting\n", queue->description);
cnd_wait(&(queue->empty), &(queue->mutex));
}
#else
if (mqueue_size_int(queue) == 0) {
return NULL;
}
#endif
item = queue->data[queue->front];
queue->front++;
queue->gc_run--;
// Housekeeping
if (queue->gc_run == 0) {
mqueue_vacuum(queue);
queue->gc_run = GC_COUNT;
}
#ifdef MIKELIBC_THREADS
mdbgf("%s: signaling full cond and releasing mutex\n", queue->description);
cnd_signal(&(queue->full));
mtx_unlock(&(queue->mutex));
#endif
return item;
}
void mqueue_vacuum(mqueue_t *queue) {
// Note: This runs inside of the acquired mutex if running
// multi-threaded.
mdbgf("%s: in mqueue_vacuum: queue->front is %d, queue->rear is %d\n",
queue->description, queue->front, queue->rear);
if (queue->front == 0) {
// Nothing to do.
mdbgf("%s: nothing to do\n", queue->description);
return;
}
// Converting a void** to a void* for memmove.
void *dest = queue->data;
void *src = queue->data + queue->front;
size_t bytes2copy = sizeof(void *) * mqueue_size_int(queue);
mdbgf("%s: moving %d bytes to fix slack\n", queue->description, bytes2copy);
dest = memmove(dest, src, bytes2copy);
queue->rear -= queue->front;
queue->front = 0;
mdbgf("%s: queue->front is now 0, queue->rear is now %d\n",
queue->description, queue->rear);
}