-
Notifications
You must be signed in to change notification settings - Fork 1
/
s3select_example.cpp
237 lines (183 loc) · 5.18 KB
/
s3select_example.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
#include "s3select.h"
#include <fstream>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include "arrow/io/api.h"
#include "arrow/ipc/feather.h"
#include <arrow/api.h>
#include <fstream>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <arrow/result.h>
#include <arrow/status.h>
#include <arrow/table.h>
#include <arrow/ipc/api.h>
#include <arrow/ipc/options.h>
#include <arrow/filesystem/api.h>
using arrow::Int64Builder;
using arrow::StringBuilder;
using arrow::schema;
using arrow::Table;
using arrow::Status;
using arrow::ipc::RecordBatchWriter;
using namespace s3selectEngine;
using namespace BOOST_SPIRIT_CLASSIC_NS;
bool is_parquet_file(const char * fn)
{//diffrentiate between csv and parquet
const char * ext = "parquet";
if(strstr(fn+strlen(fn)-strlen(ext), ext ))
{
return true;
}
return false;
}
int run_query_on_parquet_file(const char* input_query, const char* input_file)
{
int status;
s3select s3select_syntax;
status = s3select_syntax.parse_query(input_query);
if (status != 0)
{
std::cout << "failed to parse query " << s3select_syntax.get_error_description() << std::endl;
return -1;
}
parquet_object parquet_processor(input_file,&s3select_syntax);
std::string result;
do
{
try
{
status = parquet_processor.run_s3select_on_object(result); //TODO error description
}
catch (base_s3select_exception &e)
{
std::cout << e.what() << std::endl;
//m_error_description = e.what();
//m_error_count++;
if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL) //abort query execution
{
return -1;
}
}
std::cout << result << std::endl;
if (status < 0)
break;
} while (1);
return 0;
}
int main(int argc, char** argv)
{
//purpose: demostrate the s3select functionalities
s3select s3select_syntax;
char* input_query = 0;
for (int i = 0; i < argc; i++)
{
if (!strcmp(argv[i], "-q"))
{
input_query = argv[i + 1];
}
}
if (!input_query)
{
std::cout << "type -q 'select ... from ... '" << std::endl;
return -1;
}
int status = s3select_syntax.parse_query(input_query);
if (status != 0)
{
std::cout << "failed to parse query " << s3select_syntax.get_error_description() << std::endl;
return -1;
}
std::string object_name = s3select_syntax.get_from_clause();
if (is_parquet_file(object_name.c_str()))
{
try {
return run_query_on_parquet_file(input_query, object_name.c_str());
}
catch (base_s3select_exception &e)
{
std::cout << e.what() << std::endl;
if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL) //abort query execution
{
return -1;
}
}
}
FILE* fp;
if (object_name.compare("stdin")==0)
{
fp = stdin;
}
else
{
fp = fopen(object_name.c_str(), "r");
}
if(!fp)
{
std::cout << " input stream is not valid, abort;" << std::endl;
return -1;
}
struct stat statbuf;
bool to_aggregate=false;
lstat(object_name.c_str(), &statbuf);
std::string s3select_result;
s3selectEngine::csv_object::csv_defintions csv;
csv.use_header_info = false;
//csv.column_delimiter='|';
//csv.row_delimiter='\t';
s3selectEngine::csv_object s3_csv_object(&s3select_syntax, csv);
//s3selectEngine::csv_object s3_csv_object(&s3select_syntax);
#define BUFF_SIZE 1024*1024*4
char* buff = (char*)malloc( BUFF_SIZE );
while(1)
{
//char buff[4096];
//char * in = fgets(buff,sizeof(buff),fp);
size_t input_sz = fread(buff, 1, BUFF_SIZE, fp);
char* in=buff;
std::shared_ptr<arrow::Table> arrowtable;
if(fp == stdin)
{
if (feof(fp))
{
to_aggregate = true;
}
status = s3_csv_object.run_s3select_on_object(s3select_result,in,input_sz,false,false,to_aggregate);
}
else
{
// status = s3_csv_object.run_s3select_on_stream(s3select_result, in, input_sz, statbuf.st_size);
status = s3_csv_object.run_s3select_on_stream(arrowtable, in, input_sz, statbuf.st_size);
std::cout <<"Received Arrow table. Creating the Arrow file at testthearrow.arrow."<<std::endl;
std::string arrow_filename = "testthearrow.arrow";
auto fs = new arrow::fs::LocalFileSystem(arrow::fs::LocalFileSystemOptions::Defaults());
auto output_file_open_result = fs->OpenOutputStream(arrow_filename);
auto output_file = output_file_open_result.ValueOrDie();
std::cout <<"Creating file in progress.."<<std::endl;
auto batch_writer_result= arrow::ipc::MakeFileWriter(output_file,arrowtable->schema());
auto batch_writer = batch_writer_result.ValueOrDie();
batch_writer->WriteTable(*arrowtable);
std::cout <<"File created."<<std::endl;
batch_writer->Close();
}
if(status<0)
{
std::cout << "failure on execution " << std::endl << s3_csv_object.get_error_description() << std::endl;
break;
}
if(s3select_result.size()>1)
{
std::cout << s3select_result;
}
s3select_result = "";
if(!input_sz || feof(fp))
{
break;
}
}
free(buff);
fclose(fp);
return 0;
}