-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
182 lines (129 loc) · 5.85 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
#!/usr/bin/env python3
#
# main.py - Main execution logic.
#
import json
import os
import requests
import sys
import time
from cleaner import *
from config import *
from const import *
from file import *
print()
# Take in the file key from arguments (mandatory)
try:
file_key = sys.argv[1]
print("File key supplied as an argument:", sys.argv[1])
except:
print("No file key specified, exiting.")
exit()
# Handle converting relative file paths to absolute for cron purposes.
absolute_path = os.path.dirname(__file__)
data_file_path = os.path.join(absolute_path, "data/" + file_key + ".json")
state_file_path = os.path.join(absolute_path, "var/" + file_key + ".state")
print("Data File Location: ", data_file_path)
print("State File Location: ", state_file_path, "\n")
# Optionally, take in the index from arguments
try:
splunk_index = sys.argv[2]
print("Index supplied as an argument:", sys.argv[2])
except:
print("No index specified, defaulting to config.")
pass
print("Sending to Index:", splunk_index, "\n")
# If the state file exists and load it so we maintain state
if os.path.exists(state_file_path):
state_tracker = load_state_file(state_file_path)
# If state file doesn't exit, create one
if not os.path.exists(state_file_path):
state_tracker = create_state_file(state_file_path)
# Check for data file existence and length
if not os.path.exists(data_file_path):
print("Data file not found:", data_file_path)
print("Exiting.\n")
exit()
if os.path.exists(data_file_path):
data_file_length = get_data_file_length(data_file_path)
# Optionally, take in the playout duration from arguments
# Always override the state_tracker time_playout_seconds
try:
time_playout_seconds = int(sys.argv[3])
print("Playout duration supplied as an argument:", sys.argv[3])
except:
print("No playout duration, defaulting to config.")
pass
print("Playout Duration in Seconds:", time_playout_seconds, "\n")
# Main loop
try:
#Open a persistent tcp session to Splunk HEC
session = requests.session()
print("\nBegin Main Loop")
print("Starting at Line:", state_tracker['current_line'])
state_tracker['time_window'] = time_playout_seconds
state_tracker['eps'] = round(data_file_length / time_playout_seconds, 0)
if state_tracker['eps'] < 1:
state_tracker['eps'] = 1
print("Events Per Second:", state_tracker['eps'], "\n")
write_state_to_disk(state_file_path, state_tracker)
timer_start = time.time()
# Per line loop
while 1==1:
# Get one line at a time from the data file
current_line_json = get_line(data_file_path, int(state_tracker['current_line']))
event = cleaner({"time": time.time(),
"index": splunk_index,
"host": current_line_json['host'],
"source": current_line_json['source'],
"sourcetype": current_line_json['sourcetype'],
"event": current_line_json['event'] })
# Group events together for sending as a batch
event_json_storage += json.dumps(event) + "\r\n"
# Mod the current_line to send events as a batch per the eps variable
if state_tracker['current_line'] % state_tracker['eps'] == 0:
# Write batch to HEC
r = session.post(splunk_url + splunk_hec_event_endpoint, headers=splunk_auth_header, data=event_json_storage, verify=False)
event_json_storage = ""
# If the last batch completed in < 1 second (typically does),
# then sleep for the remainder of the second.
timer_end = time.time()
timer_duration = timer_end - timer_start
if 1 - timer_duration > 0:
time.sleep(1 - timer_duration)
timer_start = time.time()
# Write state file to disk each batch
write_state_to_disk(state_file_path, state_tracker)
# Per batch debug level logging
if debug:
print("-> Sent up to Line:", state_tracker['current_line'], " Sleeping:", 1 - timer_duration)
# Report to stdout
if state_tracker['current_line'] % state_tracker_reporting_factor == 0:
print("State Tracker:", state_tracker)
# Advance to the next line
state_tracker['current_line'] += 1
# If we reach EoF and should_loop==True, then reset the state_tracker and start over.
if int(state_tracker['current_line']) == int(data_file_length) and should_loop==True:
print("Reached EoF - Starting Over ", state_tracker)
# Write batch to HEC
r = session.post(splunk_url + splunk_hec_event_endpoint, headers=splunk_auth_header, data=event_json_storage, verify=False)
event_json_storage = ""
state_tracker['current_line'] = 1
print("State Reset Completed", state_tracker)
# If we reach EoF and should_loop==False, then delete the state file and exit.
if int(state_tracker['current_line']) == int(data_file_length) and should_loop==False:
# Write batch to HEC
r = session.post(splunk_url + splunk_hec_event_endpoint, headers=splunk_auth_header, data=event_json_storage, verify=False)
event_json_storage = ""
delete_state_file(state_file_path)
print("Reached EoF - Exiting")
exit()
# If we get interrupted at the keyboard (Ctrl^C)
except KeyboardInterrupt:
# Dump current state
write_state_to_disk(state_file_path, state_tracker)
# Close our HEC session
session.close()
# Final logs
print("Caught Keyboard Interrupt - Quitting")
print("State Tracker Written to Disk:", state_tracker, "\n\n")