-
Notifications
You must be signed in to change notification settings - Fork 1
/
orderby.go
197 lines (162 loc) · 4.84 KB
/
orderby.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
// Copyright © 2024 Peter Haag peter@people.ops-trust.net
// All rights reserved.
//
// Use of this source code is governed by the license that can be
// found in the LICENSE file.
package nfdump
import (
"fmt"
"github.com/twotwotwo/sorts"
)
// sort record
// contains value to be sorted and index into the record array with the flow record.
// keeping the flow record separate, is more CPU cache friendly, as only a sortRecord
// needs to be moved in memory
type sortRecord struct {
index uint32
value uint64
}
// the sort slice type for sorting
type sortType []sortRecord
// the slice for all sort record, to be sorted
var sortArray []sortRecord
// the static record array is just filled and never moved
var recordArray []*FlowRecordV3
// sort direction ASCENDING
const ASCENDING = 1
// sort direction DESCENDING
const DESCENDING = 2
// implement sorts interface
// return len of sorting array
func (a sortType) Len() int { return len(a) }
// swap 2 elements
func (a sortType) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
// return key of element i
func (a sortType) Key(i int) uint64 {
return a[i].value
}
// compare two values for less
func (a sortType) Less(i, j int) bool {
return a[i].value < a[j].value
}
// value functions
// return appropriate values to be sorted
type valueFuncType func(record *FlowRecordV3) uint64
// tstart in msec
func getTstart(record *FlowRecordV3) uint64 {
var value uint64
if genericFlow := record.GenericFlow(); genericFlow != nil {
value = genericFlow.MsecFirst
}
return value
}
// tend in msec
func getTend(record *FlowRecordV3) uint64 {
var value uint64
if genericFlow := record.GenericFlow(); genericFlow != nil {
value = genericFlow.MsecFirst
}
return value
}
// packets
func getPackets(record *FlowRecordV3) uint64 {
var value uint64
if genericFlow := record.GenericFlow(); genericFlow != nil {
value = genericFlow.InPackets
}
return value
}
// bytes
func getBytes(record *FlowRecordV3) uint64 {
var value uint64
if genericFlow := record.GenericFlow(); genericFlow != nil {
value = genericFlow.InBytes
}
return value
}
// order option - name and function
type orderOption struct {
name string
orderFunc valueFuncType
}
// table with all possible orderBy options currently implemented
var orderTable = []orderOption{
{"tstart", getTstart},
{"tend", getTend},
{"packets", getPackets},
{"bytes", getBytes},
}
// function, which uses recordChain as input
// - sorts the records by orderBy
// - accepts orderBy as defined in the order table
// - accpets direction as either ASCENDING or DESCENDING
//
// returns chain element with channel of sorted records
func (recordChain *RecordChain) OrderBy(orderBy string, direction int) *RecordChain {
// propagate error, if input void
if recordChain.err != nil {
return &RecordChain{recordChan: nil, err: recordChain.err}
}
// get appropriate value function
var valueFunc valueFuncType
for i := 0; i < len(orderTable); i++ {
if orderBy == orderTable[i].name {
valueFunc = orderTable[i].orderFunc
break
}
}
if valueFunc == nil {
return &RecordChain{recordChan: nil, err: fmt.Errorf("Unknown orderBy: %s", orderBy)}
}
// write the sorted records to this channel
writeChan := make(chan *FlowRecordV3, 128)
// store all flow records into an array for later printing
// initial len - 1 meg
recordArray = make([]*FlowRecordV3, 1024*1024)
// store value to be sorted and index of appropriate flow record of
// recordArray. initial len - 1 meg
sortArray = make([]sortRecord, 1024*1024)
// fire off goroutine
go func(readChan chan *FlowRecordV3) {
var arrayLen = len(sortArray)
// use direct access ..[cnt] to slice to speed up instead of append()
// increase array if needed
var cnt uint32 = 0
for record := range readChan {
if uint32(arrayLen)-cnt == 0 {
// extend array, if exhausted
// sortArray
sortArray = append(sortArray, make([]sortRecord, 2*arrayLen)...)
// recordArray
recordArray = append(recordArray, make([]*FlowRecordV3, 2*arrayLen)...)
// use new len of array. Go may assign more memory than requested
// so use actual len
arrayLen = len(sortArray)
}
// calculate sort value and assign values
recordArray[cnt] = record
value := valueFunc(record)
sortArray[cnt] = sortRecord{cnt, value}
cnt++
}
// sort array
// the interface makes use of len() - therefore cut slice pointer to real size
sorts.ByUint64(sortType(sortArray[0:cnt]))
if direction == ASCENDING {
for i := 0; i < int(cnt); i++ {
index := sortArray[i].index
record := recordArray[index]
writeChan <- record
}
} else {
for i := int(cnt) - 1; i >= 0; i-- {
index := sortArray[i].index
record := recordArray[index]
writeChan <- record
}
}
close(writeChan)
}(recordChain.recordChan)
// return chain element
return &RecordChain{recordChan: writeChan, err: nil}
} // End of OrderBy