-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_ysb_flowgraph_batched.cpp
145 lines (140 loc) · 5.3 KB
/
test_ysb_flowgraph_batched.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
/******************************************************************************
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License version 3 as
* published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
* License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
******************************************************************************
*/
/*
* Test application of the Yahoo! Streaming Benchmark
* (TBB FlowGraph version). Batched implementation.
*
* The application is a pipeline of four stages:
* EventSource (generator of events at full speed)
* Filter
* Join
* Window Aggregate
*/
// include
#include <fstream>
#include <iostream>
#include <iterator>
#include "tbb/tbb.h"
#include "tbb/flow_graph.h"
#include <ysb_common.hpp>
#include <ysb_nodes_batched.hpp>
#include <campaign_generator.hpp>
// global variable: starting time of the execution
extern volatile unsigned long start_time_usec;
// global variable: number of generated events
extern atomic<long> sentCounter;
// main
int main(int argc, char *argv[])
{
int option = 0;
unsigned long exec_time_sec = 0;
size_t pardegree1 = 1;
size_t pardegree2 = 1;
int TBBThreads = -1;
size_t batch_len = 1;
// initialize global sentCounter
sentCounter = 0;
// arguments from command line
if (argc < 9) {
cout << argv[0] << " -l [execution_seconds] -n [par_degree] -m [par_degree] [-t numTBBThreads] -b [batch len]" << endl;
exit(EXIT_SUCCESS);
}
while ((option = getopt(argc, argv, "l:n:m:t:b:")) != -1) {
switch (option) {
case 'l': exec_time_sec = atoi(optarg);
break;
case 'n': pardegree1 = atoi(optarg);
break;
case 'm': pardegree2 = atoi(optarg);
break;
case 't': TBBThreads = atoi(optarg);
break;
case 'b': batch_len = atoi(optarg);
break;
default: {
cout << argv[0] << " -l [execution_seconds] -n [par_degree] -m [par_degree] [-t numTBBThreads] -b [batch len]" << endl;
exit(EXIT_SUCCESS);
}
}
}
// initialize TBB environment
tbb::task_scheduler_init init((TBBThreads > 0) ? TBBThreads : tbb::task_scheduler_init::default_num_threads());
// create the campaigns
CampaignGenerator campaign_gen;
// the application graph
graph g;
// create the TBB FlowGraph nodes (left part)
vector<source_node_batched_t *> sources;
vector<filter_node_batched_t *> filters;
vector<map_node_batched_t *> maps;
vector<window_node_batched_t *> workers;
for(size_t i=0; i<pardegree1; ++i) {
// create source
auto source = new source_node_batched_t(g, YSBSourceBatched(exec_time_sec, campaign_gen.getArrays(), campaign_gen.getAdsCompaign(), batch_len));
assert(source);
sources.push_back(source);
// create filter
auto filter = new filter_node_batched_t(g, unlimited, YSBFilterBatched());
assert(filter);
filters.push_back(filter);
// create the flat-map
auto join = new map_node_batched_t(g, unlimited, YSBJoinBatched(workers, campaign_gen.getHashMap(), campaign_gen.getRelationalTable()));
assert(join);
maps.push_back(join);
}
// create the TBB FlowGraph nodes (right part)
for(size_t i=0; i<pardegree2; ++i) {
// create the aggregation
auto aggregation = new window_node_batched_t(g, 1, WinAggregateBatched(i, pardegree1));
assert(aggregation);
workers.push_back(aggregation);
}
// create the connections between nodes
for(size_t i=0; i<pardegree1; ++i) {
make_edge(*sources[i], *filters[i]);
make_edge(*filters[i], *maps[i]);
}
// initialize global start_time_usec
volatile unsigned long start_time_main_us = current_time_usecs();
start_time_usec = start_time_main_us;
// starting all sources
for(size_t i=0; i<pardegree1; ++i)
sources[i]->activate();
// waiting for termination
g.wait_for_all();
// final statistics
volatile unsigned long end_time_main_us = current_time_usecs();
double elapsed_time_sec = (end_time_main_us - start_time_main_us) / (1000000.0);
unsigned long rcvResults = 0;
for(size_t i=0; i<pardegree2; ++i) {
auto body = copy_body<WinAggregateBatched, window_node_batched_t>(*workers[i]);
rcvResults += body.rcvResults();
}
cout << "[Main] Total generated messages are " << sentCounter << endl;
cout << "[Main] Total received results are " << rcvResults << endl;
cout << "[Main] Throughput " << sentCounter/elapsed_time_sec << endl;
cout << "[Main] Total elapsed time (seconds) " << elapsed_time_sec << endl;
// delete all the created nodes/operators
for(size_t i=0; i<pardegree1; ++i) {
delete sources[i];
delete filters[i];
delete maps[i];
}
for(size_t i=0; i<pardegree2; ++i) {
delete workers[i];
}
return 0;
}