-
Notifications
You must be signed in to change notification settings - Fork 2
/
Proxy.c
275 lines (196 loc) · 7.99 KB
/
Proxy.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
#include "Proxy.h"
extern int debug_level;
void doProxy()
{
//printf("LDBL_MAX = %Lf\n", LDBL_MAX);
initQueryPlanner();
int subQ_len, result_len, num_workers_to_send_results = 0;
char *final_result = NULL, *partial_result = NULL;
char *query = NULL, *subQ = NULL; ;
char filename[20];
MPI_Status status;
FILE *stream;
double start_t = 0, end_t = 0;
int worker_traffic = 0; // must change to long
int total_traffic = 0; // must change to long
char *result_set = NULL;
short num_keys;
char **keys;
Node *node = NULL;
Query *Q = NULL;
// time_t startTime, endTime;
struct timeval start_tv;
struct timeval end_tv;
struct timeval diff_tv;
struct timeval start_pq;
struct timeval end_pq;
struct timeval diff_pq;
/***** receive query from the Client *****/
MPI_URecv(&query, MPI_CHAR, CLIENT, Q_TAG, MPI_COMM_WORLD, &status);
debug_print(debug_level, COMM_LOG, "PROXY received from Client:\n%s\n", query);
/***** create & assign sub-queries using the query planner *****/
//time_t start_time = time(NULL);
gettimeofday(&start_pq, NULL);
Q = planQuery(query);
gettimeofday(&end_pq, NULL);
//time_t end_time = time(NULL);
long int diffpq = (end_pq.tv_usec + 1000000 * end_pq.tv_sec) - (start_pq.tv_usec + 1000000 * start_pq.tv_sec);
diff_pq.tv_sec = diffpq / 1000000;
diff_pq.tv_usec = diffpq % 1000000;
debug_print(debug_level, Q_PLAN_LOG,
"Proxy finished the query planning in %ld.%06ld seconds\n", diff_pq.tv_sec, diff_pq.tv_usec);
/***** send sub-queries to the assigned workers ****/
debug_print(debug_level, Q_PLAN_LOG,
"---> Final number of join nodes: %hd\n", Q->num_join_nodes);
debug_print(debug_level, Q_PLAN_LOG,
"---> Final number of merged nodes: %hd\n", Q->num_merged_nodes);
// startTime = time(NULL); //aisha
gettimeofday(&start_tv, NULL);
/***** If there is an empty result Set ****/
//add check for Q->num_join_nodes < 0 - AISHA
if(Q->num_join_nodes < 0){
//stuff to tell it's an empty resultFile
if((stream = fopen(RESULT_FILE,"w+")) == NULL) {
systemError("Couldn't create a result file");
debug_print(debug_level, ERROR_LOG, "%s\n",
"Couldn't create a result file");
}
char *no_match = "<empty result>\0";
fwrite(no_match, sizeof(char), strlen(no_match), stream);
fclose(stream);
final_result = readFile(RESULT_FILE);
}
/***** If one machine (the PROXY) can handle the client (<=1 join node) ****/
else if(Q->num_join_nodes <= 1
|| Q->num_join_nodes - Q->num_merged_nodes <= 1)
{
// debug_print(debug_level, Q_PLAN_LOG, "%s\n", "Number of join nodes <= 1, or 0 workers");
// debug_print(debug_level, Q_PLAN_LOG, " PROXY executing the query %s\n", query);
// SubQInfo arg;
// pthread_t subQ_tid;
// arg.subQ = query;
// arg.w_rank = PROXY;
//pthread_create(&subQ_tid, NULL, executeSubQuery, &arg);
/** execute just on rdf, since there is no need to communicate data **/
char infile[20];
char outfile[20];
char timefile[30];
char command[100];
FILE *streamP;
sprintf(infile, "subquery_%d.txt", PROXY);
//sprintf(outfile, "result_%d.txt", PROXY);
//sprintf(outfile, "Result-Q12", PROXY);
//sprintf(timefile, "internal_time_%d.txt", PROXY);
//printf("infile: \"%s\"\n", infile);
if(!(streamP = fopen(infile, "w+"))) {
systemError("Could not open subQ file");
debug_print(debug_level, ERROR_LOG, "%s\n", "Could not open subQ file");
}
fprintf(streamP, "%s", query);
fclose(streamP);
sprintf(command, "./rdf3x/rdf3xquery %s < %s > %s", DB, infile, RESULT_FILE);
int sysid = system(command);
if(sysid < 0)
debug_print(debug_level, ERROR_LOG, "%s\n", "Failed to issue make clean");
}
/***** More than one machine is needed to handle query (>1 join node) ****/
else
{
// isWorkerNeeded = 1;
// MPI_Bcast(&isWorkerNeeded, 1, MPI_INT, 0, MPI_COMM_WORLD);
// debug_print(debug_level, Q_PLAN_LOG,
// "Num OF join nodes : %hd\n", Q->num_join_nodes);
for(int i = 0; i < Q->num_join_nodes; i++)
{
node = Q->compactGraph[i];
if(!node->is_deleted
&& node->subQ->optimalset != NULL)
{
sendSubQToWorker(Q, node, &num_workers_to_send_results);
}
}
// debug_print(debug_level, COMM_LOG, "PROXY awaiting %d Worker(s) to reply\n",
// num_workers_to_send_results);
/***** receive partial results to union ****/
for(int j = 0; j < num_workers_to_send_results; j++) //what if there is more than one worker sending results?
{
int len = MPI_URecv(&partial_result, MPI_CHAR, MPI_ANY_SOURCE,
RESULT_SET_TO_UNION_TAG, MPI_COMM_WORLD, &status);
debug_print(debug_level, COMM_LOG,
"PROXY recvd final result of length %d from Worker %d\n",
len, status.MPI_SOURCE);
sprintf(filename, "result_%d.txt", PROXY);
if((stream = fopen(filename,"w+")) == NULL) {
systemError("Couldn't create a temp. result file");
debug_print(debug_level, ERROR_LOG, "%s\n",
"Couldn't create a temp. result file");
}
fwrite(partial_result, sizeof(char), len, stream);
free(partial_result);
fclose(stream);
}
/** issue a "make clean" **/
char command_clean[256];
sprintf(command_clean, "make clean\n");
int syscleanid = system(command_clean);
if(syscleanid < 0)
debug_print(debug_level, ERROR_LOG, "%s\n", "Failed to issue make clean");
/** sort the ids before converting **/
char command_sort[256];
sprintf(command_sort, "sort -n -o %s %s", filename, filename);//aisha - change this line back
//printf("filename \"%s\"\n", filename);
//sprintf(command_sort, "sort -nr -o %s %s\0", filename, filename);
int syssortid = system(command_sort);
if(syssortid < 0)
debug_print(debug_level, ERROR_LOG, "%s\n", "Failed to sort IDs");
/** remove previous result file **/
char command_delete[256];
sprintf(command_delete, "rm %s\n", RESULT_FILE);
int sysdeltid = system(command_delete);
if(sysdeltid < 0)
debug_print(debug_level, ERROR_LOG, "%s\n", "Failed to send delete command to terminal");
/** convert ids 2 names **/
char zfilename[20];
char ztfilename[20];
char zcommand[100];
sprintf(zfilename, "result_%d.txt\0", PROXY);
sprintf(zcommand, "./id2name %s %s %s\n",
DB, zfilename, RESULT_FILE);
//printf("DB: %s, zfilename: %s, RESULT_FILE: %s\n", DB, zfilename, RESULT_FILE);
int sysid;
sysid = system(zcommand);
if(sysid < 0)
debug_print(debug_level, ERROR_LOG, "%s\n", "Failed to convert IDs to Names");
}
//endTime = time(NULL); //aisha
gettimeofday(&end_tv, NULL);
/***** send result back to the Client ****/
final_result = readFile(RESULT_FILE);
if(
(strcmp(final_result, "") == 0) ||
(strcmp(final_result, "\n") == 0) ||
(strcmp(final_result, "\0") == 0)
){
//add check - do we need to include these steps in Q->num_join_nodes < 0?
if((stream = fopen(RESULT_FILE,"w+")) == NULL) {
systemError("Couldn't create a result file");
debug_print(debug_level, ERROR_LOG, "%s\n",
"Couldn't create a result file");
}
char *no_match = " ******** No Match Found! ******** \0";
fwrite(no_match, sizeof(char), strlen(no_match), stream);
fclose(stream);
final_result = readFile(RESULT_FILE);
}
MPI_Send(final_result, strlen(final_result),
MPI_CHAR, CLIENT, Q_RESULT_TAG, MPI_COMM_WORLD);
// debug_print(debug_level, COMM_LOG, "%s\n", "Proxy sent final result to Client"); aisha
free(final_result);
// int diff = endTime - startTime;
long int diff = (end_tv.tv_usec + 1000000 * end_tv.tv_sec) - (start_tv.tv_usec + 1000000 * start_tv.tv_sec);
diff_tv.tv_sec = diff / 1000000;
diff_tv.tv_usec = diff % 1000000;
printf("dream: ");
printf("%ld.%06ld\n", diff_tv.tv_sec, diff_tv.tv_usec);
//debug_print(debug_level, COMM_LOG, "TOTAL_TIME: %d\n", diff);
}