-
Notifications
You must be signed in to change notification settings - Fork 2
/
snbuf.cpp
365 lines (338 loc) · 11.4 KB
/
snbuf.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
// Supernova Buffer Code
//
// K Labe June 17 2014
// K Labe September 26 2014 Add code to handle end of file and buffer saving
// K Labe November 2 2014 Use a single contiguous block of memory for buffer
// K Labe April 7 2016 Modify FillHeaderBuffer to check for run type
#include "PZdabFile.h"
#include "PZdabWriter.h"
#include <stdint.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include "struct.h"
#include "snbuf.h"
#include "curl.h"
#include "output.h"
#define MAXSIZE 30472 // Largest possible event
struct burststate
{
int head;
int tail;
bool burst;
};
static const uint64_t maxtime = (1UL << 43);
static char* burstname;
// Stuff for the burst buffer
static const int EVENTNUM = 1000; // Maximum Burst buffer depth
static const int ENDWINDOW = 1*50000000; // Integration window for ending bursts
static char* burstev[EVENTNUM]; // Burst Event Buffer
static uint64_t bursttime[EVENTNUM]; // Burst Time Buffer
static burststate burstptr; // Object to hold pointers to head and tail of burst
static uint64_t starttick = 0; // Start time (in 50 MHz ticks) of burst
static int burstindex = 0; // Number of bursts seen
static int bcount = 0; // Number of events in present burst
// Stuff for the header buffer
static const int headertypes = 3;
static const uint32_t Headernames[headertypes] =
{ RHDR_RECORD, TRIG_RECORD, EPED_RECORD };
static char* header[headertypes];
// These are the filenames for storing the buffer between subfiles
static const char* fnburststate = "burststate.txt";
static const char* fnburstev = "burstev.bin";
static const char* fnbursttime = "bursttime.txt";
// This function initializes the two SN Buffers. It tries to read in the
// state of the buffer from file, or otherwise initializes it empty. It also
// initializes the header buffer.
void InitializeBuf(){
// Try to read from file
FILE* fburststate = fopen(fnburststate, "r");
FILE* fburstev = fopen(fnburstev, "rb");
FILE* fbursttime = fopen(fnbursttime, "r");
if(fburststate && fburstev && fbursttime){
fscanf(fburststate, "%d %d %d", &burstptr.head, &burstptr.tail,
&burstptr.burst);
burstev[0] = (char*) malloc(MAXSIZE*sizeof(uint32_t)*EVENTNUM);
if(burstev[0] == NULL){
printf("Error: SN Buffer could not be initialized.\n");
alarm(40, "Stonehenge: SN Buffer could not be initialized.", 12);
exit(1);
}
for(int i=0; i<EVENTNUM; i++){
if(fscanf(fbursttime, "%llu \n", &bursttime[i]) != 1)
bursttime[i] = 0;
burstev[i] = (char*) (burstev[0] + i*MAXSIZE*sizeof(uint32_t));
}
double fburstevsize = ftell(fburstev);
if(fread(burstev[0], sizeof(char), sizeof(burstev), fburstev) != fburstevsize){
memset(burstev[0], 0, MAXSIZE*sizeof(uint32_t)*EVENTNUM);
}
// TODO: Handle the case of burst on file start correctly
// For now, just pretend we're not in the middle of a burst
if(burstptr.burst){
burstptr.burst = false;
}
}
// Otherwise, initialize empty
else{
burstev[0] = (char*) malloc(MAXSIZE*sizeof(uint32_t)*EVENTNUM);
if(burstev[0] == NULL){
printf("Error: SN Buffer could not be initialized.\n");
alarm(40, "Stonehenge: SN Buffer could not be initialized.", 12);
exit(1);
}
for(int i=0; i<EVENTNUM; i++){
burstev[i] = (char*) (burstev[0] + i*MAXSIZE*sizeof(uint32_t));
bursttime[i]=0;
}
burstptr.head = -1;
burstptr.tail = -1;
burstptr.burst = false;
}
// Set up the header buffer
for(int i=0; i<headertypes; i++){
header[i] = (char*) malloc(NWREC);
memset(header[i], 0, NWREC);
}
// Close files if necessary
if(fburststate) fclose(fburststate);
if(fburstev) fclose(fburstev);
if(fbursttime) fclose(fbursttime);
}
// This function clears the pre-loaded buffer if the times are in the future
void Checkbuffer(uint64_t firsttime){
if(!burstptr.head==-1){
uint64_t oldtime = bursttime[burstptr.head];
if( firsttime < oldtime ){
memset(burstev[0], 0, MAXSIZE*sizeof(uint32_t)*EVENTNUM);
for(int i=0; i<EVENTNUM; i++){
bursttime[i] = 0;
}
burstptr.head = -1;
burstptr.tail = -1;
}
}
}
// This function drops old events from the buffer once they expire
void UpdateBuf(uint64_t longtime, int BurstLength){
// The case that the buffer is empty
if(burstptr.head==-1)
return;
// Normal Case
int BurstTicks = BurstLength*50000000; // length in ticks
while((bursttime[burstptr.head] < longtime - BurstTicks) && (burstptr.head!=-1)){
bursttime[burstptr.head] = 0;
memset(burstev[burstptr.head], 0, MAXSIZE*sizeof(uint32_t));
AdvanceHead();
// Reset to empty state if we have emptied the queue
if(burstptr.head==burstptr.tail){
burstptr.head=-1;
burstptr.tail=-1;
}
}
}
// This fuction adds events to an open Burst File
void AddEvBFile(PZdabWriter* const b){
// Write out the data
if(b->WriteBank(
PZdabFile::GetBank((nZDAB*) burstev[burstptr.head]), kZDABindex)){
fprintf(stderr, "Error writing zdab to burst file\n");
alarm(30, "Stonehenge: Error writing zdab to burst file", 0);
}
// Drop the data from the buffer
memset(burstev[burstptr.head], 0, MAXSIZE*sizeof(uint32_t));
bursttime[burstptr.head] = 0;
AdvanceHead();
bcount++;
}
// This function adds a new event to the buffer
void AddEvBuf(const nZDAB* const zrec, const uint64_t longtime,
const uint32_t reclen, PZdabWriter* const b){
// Check whether we will overflow the buffer
// If so, first drop oldest event, then write
if(burstptr.head==burstptr.tail && burstptr.head!=-1){
fprintf(stderr, "ALARM: Burst Buffer has overflowed!\n");
alarm(30, "Stonehenge: Burst buffer has overflown.", 0);
if(!burstptr.burst){
fprintf(stderr, "ALARM: Burst Threshold larger than buffer!\n");
alarm(30, "Stonehenge: Burst threshold larger than buffer.", 0);
}
else
AddEvBFile(b);
}
// Write the event to the buffer
// If buffer empty, set pointers appropriately
if(burstptr.tail==-1){
burstptr.tail=0;
burstptr.head=0;
}
if(reclen < MAXSIZE*4){
memcpy(burstev[burstptr.tail], zrec, reclen);
}
else{
char buf[128];
sprintf(buf, "ALARM: Event too big for buffer! %d bytes!"
" Skipping this event.¬ify\n", reclen);
fprintf(stderr, buf);
alarm(30, buf, 0);
}
bursttime[burstptr.tail] = longtime;
if(burstptr.tail<EVENTNUM - 1)
burstptr.tail++;
else
burstptr.tail=0;
}
// This function computes the number of burst candidate events currently
// in the buffer
int Burstlength(){
int burstlength = 0;
if(burstptr.head!=-1){
if(burstptr.head<burstptr.tail)
burstlength = burstptr.tail - burstptr.head;
else
burstlength = EVENTNUM + burstptr.tail - burstptr.head;
}
return burstlength;
}
// This function writes out the allowable portion of the buffer to a burst file
void Writeburst(uint64_t longtime, PZdabWriter* b){
while((bursttime[burstptr.head] < longtime - ENDWINDOW) && (burstptr.head < burstptr.tail)){
AddEvBFile(b);
}
}
// This function opens a new burst file
void Openburst(PZdabWriter* & b, uint64_t longtime, char* outfilebase,
bool clobber){
starttick = bursttime[burstptr.head];
char buff[128];
sprintf(buff, "Burst %i has begun!\n", burstindex);
fprintf(stderr, buff);
alarm(20, buff, 0);
char namebuff[128];
sprintf(namebuff, "%s_%s_%i", burstname, outfilebase, burstindex);
b = Output(namebuff, clobber, 1);
for(int i=0; i<headertypes; i++){
OutHeader((nZDAB*) header[i], b);
}
}
// This function writes out the remainder of the buffer when burst ends
void Finishburst(PZdabWriter* & b, uint64_t longtime){
while(burstptr.head < burstptr.tail+1){
AddEvBFile(b);
}
burstptr.head = -1;
burstptr.tail = -1;
b->Close();
delete b;
uint64_t btime = longtime - starttick;
float btimesec = btime/50000000.;
char buff[256];
sprintf(buff, "Burst %i has ended. It contains %i events and lasted"
" %.2f seconds.\n", burstindex, bcount, btimesec);
fprintf(stderr, buff);
alarm(20, buff, 0);
burstindex++;
// Reset to prepare for next burst
bcount = 0;
burstptr.burst = false;
}
// This function saves the buffer state to disk.
// Burstev is saved in binary, bursttime and burststate are saved in ascii
void Saveburstbuff(){
FILE* fburststate = fopen(fnburststate, "w");
FILE* fburstev = fopen(fnburstev, "wb");
FILE* fbursttime = fopen(fnbursttime, "w");
fwrite(burstev[0], sizeof(char), MAXSIZE*sizeof(uint32_t)*EVENTNUM, fburstev);
for(int i=0; i<EVENTNUM; i++){
fprintf(fbursttime, "%llu \n", bursttime[i]);
}
fprintf(fburststate, "%d %d %d", burstptr.head, burstptr.tail, burstptr.burst);
fclose(fburststate);
fclose(fburstev);
fclose(fbursttime);
}
// This function manages the writing of events into a burst file.
bool Burstfile(PZdabWriter* & b, configuration config, alltimes alltime,
char* outfilebase, bool clobber){
// Open a new burst file if a burst starts
if(!burstptr.burst){
if(Burstlength() > config.burstsize){
Openburst(b, alltime.longtime, outfilebase, clobber);
burstptr.burst = true;
}
}
// While in a burst
if(burstptr.burst){
Writeburst(alltime.longtime, b);
// Check whether the burst has ended
if(Burstlength() < config.endrate){
Finishburst(b, alltime.longtime);
}
}
return burstptr.burst;
}
// This function wraps up the burst buffer when the end of file is reached
void BurstEndofFile(PZdabWriter* & b, uint64_t longtime){
Saveburstbuff();
if(burstptr.burst)
Finishburst(b, longtime);
}
// This function advances the head pointer appropriately
void AdvanceHead(){
if(burstptr.head < EVENTNUM - 1)
burstptr.head++;
else
burstptr.head = 0;
}
// This function is used to reset the buffer if the events
// arrive out of order in a non-recoverable way.
void ClearBuffer(PZdabWriter* & b, uint64_t longtime){
if(burstptr.burst)
Finishburst(b, longtime);
else{
memset(burstev[0], 0, MAXSIZE*sizeof(uint32_t)*EVENTNUM);
for(int i=0; i<EVENTNUM; i++){
bursttime[i] = 0;
}
burstptr.head = -1;
burstptr.tail = -1;
burstptr.burst = false;
}
}
// This function checks whether the passed record is a header record, and,
// if it is, writes it to the header buffer.
// It also checks the run type for RHDR records. It returns 0 if the record
// was not a RHDR, and the run type if it was.
uint32_t FillHeaderBuffer(nZDAB* const zrec){
uint32_t runtype = 0;
for(int i=0; i<headertypes; i++){
if(zrec->bank_name == Headernames[i]){
memset(header[i], 0, NWREC);
// Add 9 words for the length of the nZDAB header itself
// Copy the data to buffer in native format
uint32_t recLen = zrec->data_words+9;
memcpy(header[i], zrec, recLen*sizeof(uint32_t));
// For RHDR's pull out run type to return
if(i==0){
RunRecord* rhdr = (RunRecord*) (zrec+1);
SWAP_INT32(rhdr, 9);
runtype = rhdr->RunMask;
fprintf(stderr, "runtype: %d\n", runtype);
SWAP_INT32(rhdr, 9);
}
}
}
return runtype;
}
// This function returns the epoch value used to write timestamp
// bursttime[burstptr.head]
int GetEpoch()
{
uint64_t time = bursttime[burstptr.head];
int epoch = time/maxtime;
return epoch;
}
// This function sets the burst directory
void setburst(char* burstdir){
burstname = burstdir;
}