-
Notifications
You must be signed in to change notification settings - Fork 0
/
thread.c
226 lines (197 loc) · 6.95 KB
/
thread.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
#include "sort.h"
#include "stdio.h"
#include "stdlib.h"
#include "thread.h"
#include "string.h"
#include "sys/mman.h"
#include "unistd.h"
#include "pthread.h"
// two's complement to unsigned
uint32_t flip_sign(key_t key) {
return key ^ 0x80000000;
}
// ABC implementation from
// https://brilliant.org/wiki/radix-sort/
//
// All shifting and masking original, array layouts from brilliant
// count_n = C
// lower = B (kind of, B doesn't hold the values, only pointers to the values)
// start = A
// sort BITS_AT_ONCE bits at a time
// NUM_POS_VALUES = 2^BITS_AT_ONCE
//
//
// assume iteration = 1, BITS AT_ONCE = 4
// start[k].key = 0x1A93CFD2
// mask << (iteration*BITS_AT_ONCE) = 0x000000F0
// start[k].key & mask = 0x000000D0
// start[k].key & mask >> (iteration*BITS_AT_ONCE) = 0x0000000D
// indexable ^^^^^^^^
//
// sorts keys in memory into lower
int counting_sort(record* start, int size, record* lower, int iteration) {
key_t mask = (NUM_POS_VALUES-1);
int C[NUM_POS_VALUES];
mask = mask << (iteration*BITS_AT_ONCE);
for(int j = 0; j<NUM_POS_VALUES; j++) {
C[j] = 0;
}
// count occurences of mask
for(int i = 0; i<size; i++) {
// eg. C[0xF & key] += 1
C[(unsigned)(flip_sign(start[i].key) & mask) >> (iteration*BITS_AT_ONCE)] += 1;
// IMPORTANT TO USE UNSIGNED
}
// get sum-up_to[j]
// C[0] = count_n[0];
for(int j = 1; j<NUM_POS_VALUES; j++) {
C[j] = C[j] + C[j-1];
}
// final sort
for(int k = size-1; k>=0; k--) {
C[(unsigned)(flip_sign(start[k].key) & mask) >> (iteration*BITS_AT_ONCE)] -= 1;
lower[C[(unsigned)(flip_sign(start[k].key) & mask) >> (iteration*BITS_AT_ONCE)]] = start[k];
}
// for(int i = 0; i < NUM_POS_VALUES; i++) {
// atomic_fetch_add_explicit(&s_count->remaining[i], count_n[i], memory_order_acq_rel);
// }
return 0;
}
// returns count added, if fail -1
int get_msb_like_me(record* lower, record* readin, record** extras, unsigned int tid, int total_records, int num_in_lower, int* extras_made) {
int total_count = 0;
int lower_count = 0;
record* curr_extra = NULL;
int curr_extra_count = 0;
int curr_extra_idx = 0;
for(int i = 0; i < total_records; i++) {
// printf("%i, %X, %p\n", i, KEY_MASK, readin);
if(((KEY_MASK & flip_sign(readin[i].key)) >> 28) == tid) {
// printf("%08x, %08X, %08X, %X\n", readin[i].key, readin[i].key & KEY_MASK, (readin[i].key & KEY_MASK) >> 28, tid);
if(lower_count >= num_in_lower) {
// set up new array in extras if needed
if(curr_extra_count >= EXTRA_SIZE || !curr_extra) {
// printf("%i made extra \n", tid);
curr_extra = (record*) malloc(EXTRA_SIZE * sizeof(record));
if(!curr_extra) {
printf("failed to alloc extra");
}
memset(curr_extra, 0, EXTRA_SIZE * sizeof(record));
extras[curr_extra_idx] = curr_extra;
curr_extra_idx += 1;
curr_extra_count = 0;
}
// printf("putting %X in %p[%i] for %X\n", readin[i].key, curr_extra, curr_extra_count, tid);
// populate curr_extra array
curr_extra[curr_extra_count] = readin[i];
curr_extra_count += 1;
total_count += 1;
} else {
lower[lower_count] = readin[i];
lower_count += 1;
total_count += 1;
}
}
}
return total_count;
}
// get idx in out to where thread should write
int get_out_index(int total_in, shared_memory* s_mem, int tid) {
int out = -1;
while(1) {
// try to lock, if fail, wait
if(pthread_mutex_lock(s_mem->lock) == -1) {
pthread_cond_wait(s_mem->checkable, s_mem->lock);
}
if(s_mem->t_turn % THREADS == tid) {
break;
}
pthread_mutex_unlock(s_mem->lock);
}
// LOCKED
out = s_mem->curr_idx;
s_mem->curr_idx += total_in;
s_mem->t_turn += 1;
pthread_cond_broadcast(s_mem->checkable);
pthread_mutex_unlock(s_mem->lock);
// UNLOCKED
return out;
}
void* t_run(void* in_ta) {
// bullshit C setup
thread_args* ta = (thread_args*) in_ta;
// extract ta
globals* global = ta->global;
// shared_count* s_count = ta->s_count;
shared_memory* s_memory = ta->s_memory;
t_radix* thread_mem = ta->thread_mem;
// CHANGE
// ta->my_tid = 1;
t_radix* me = &thread_mem[ta->my_tid];
// setup lower
int num_in_lower = global->total_records / THREADS;
if(num_in_lower < MIN_LOWER) {num_in_lower = MIN_LOWER;}
record* lower = (record*) malloc(num_in_lower * sizeof(record));
if(!lower) {
printf("failed to alloc lower");
exit(EXIT_FAILURE);
}
memset(lower, 0, num_in_lower * sizeof(record));
record** extras = (record**) malloc(NUM_EXTRAS_POS * sizeof(record*));
if(!extras) {
printf("failed to alloc extras");
exit(EXIT_FAILURE);
}
// get my childern and count
// put in lower, then read/write out buffer
int extras_made = -1;
int total_in_me = get_msb_like_me(lower, me->arr_start, extras, me->my_tid, global->total_records, num_in_lower, &extras_made);
// for(int i = 0; i < num_in_lower; i++) {
// printf("%X:\t%08X\n", me->my_tid, lower[i].key);
// }
// get idx in out arr
int out_start = get_out_index(total_in_me, s_memory, me->my_tid);
// get pointer to my section
record* out_arr = &me->out[out_start];
// put all vals into mem
int curr_extra_idx = 0;
int added_from_extra = 0;
for(int i = 0; i < total_in_me; i++) {
if(i >= num_in_lower) {
if(added_from_extra >= EXTRA_SIZE) {
curr_extra_idx += 1;
added_from_extra = 0;
}
out_arr[i] = extras[curr_extra_idx][added_from_extra];
added_from_extra += 1;
} else {
out_arr[i] = lower[i];
}
}
// printf("HEY!\n");
// free lower and extras
if(extras_made > 0) {
for(int i = 0; i < extras_made; i++) {
free(extras[i]);
}
}
free(extras);
free(lower);
// create new lower of proper size
record* fit_lower = (record*) malloc(total_in_me * sizeof(record));
if(!lower) {
printf("failed to alloc lower");
exit(EXIT_FAILURE);
}
memset(fit_lower, 0, total_in_me * sizeof(record));
// actually sort
for(int r = 0; r < KEY_BITS / BITS_AT_ONCE; r++) {
if(r % 2 == 0) {
counting_sort(out_arr, total_in_me, fit_lower, r);
} else {
counting_sort(fit_lower, total_in_me, out_arr, r);
}
}
free(fit_lower);
pthread_exit(0);
}