-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathinkflux.py
224 lines (190 loc) · 8.62 KB
/
inkflux.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
import os
import re
import json
import logging
import requests
import threading
import inotify.adapters
from datetime import datetime
from dotenv import load_dotenv
DEFAULT_REGEX = r'(?P<timestamp>\d+\.\d+)\s+(?P<response_time>\d+)\s+(?P<client_ip>\d+\.\d+\.\d+\.\d+)\s+(?P<cache_result>\S+)\s+(?P<bytes>\d+)\s+(?P<method>\S+)\s+(?P<url>\S+)\s+-\s+(?P<cache_peer>\S+)\s+-\s*'
# Load Configurations
load_dotenv()
CONFIG_FILE = os.getenv("CONFIG_FILE_PATH", "config.json")
INKFLUX_LOG = os.getenv("INKFLUX_LOG", "inkflux.log")
# Logging Setup
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.FileHandler(INKFLUX_LOG), logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
def convert_timestamp(unix_timestamp):
"""Convert Unix timestamp to human readable format"""
try:
dt = datetime.fromtimestamp(float(unix_timestamp))
return dt.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] # Truncate microseconds to milliseconds
except ValueError as e:
logger.error(f"Error converting timestamp {unix_timestamp}: {e}")
return unix_timestamp
def get_file_lines_count(filename):
"""Get total number of lines in a file"""
try:
with open(filename) as f:
return sum(1 for _ in f)
except Exception as e:
logger.error(f"Error counting lines in {filename}: {e}")
return 0
def check_influxdb_connection(url, port, db):
"""Check InfluxDB Connection validity"""
try:
ping_url = f"{url}:{port}/ping?db={db}"
print(ping_url)
response = requests.get(ping_url, timeout=5)
return response.status_code == 204
except requests.RequestException as e:
logger.error(f"InfluxDB Connection Error: {e}")
return False
def process_line(line, regex, write_url, measurement_name):
"""Process a single line of the log file"""
line = line.strip()
match = re.match(regex, line)
if match:
data = match.groupdict()
# Convert timestamp for logging
human_readable_time = convert_timestamp(data['timestamp'])
logger.debug(f"Matched data at {human_readable_time}: {data}")
# Convert timestamp to nanoseconds for InfluxDB
timestamp_ns = int(float(data['timestamp']) * 1_000_000_000)
# Prepare InfluxDB line protocol
line_protocol = (
f"{measurement_name},"
f"client_ip={data['client_ip']},"
f"cache_result={data['cache_result']},"
f"method={data['method']},"
f"url={data['url']},"
f"cache_peer={data['cache_peer']} "
f"response_time={int(data['response_time'])}i,"
f"bytes={int(data['bytes'])}i "
f"{timestamp_ns}"
)
# Send data to InfluxDB API
try:
response = requests.post(
write_url,
data=line_protocol.encode('utf-8'),
headers={'Content-Type': 'text/plain'}
)
response.raise_for_status()
logger.debug(f"Data from {human_readable_time} successfully sent to InfluxDB")
return True
except requests.RequestException as e:
logger.error(f"Failed to send data from {human_readable_time} to InfluxDB: {e}")
return False
else:
logger.debug("No match")
return False
def process_log(log_name, config):
"""Process a single log file based on its configuration"""
influxdb_url = config.get("influxdb_url", "http://localhost")
influxdb_port = config.get("influxdb_port", "8086")
influxdb_db = config.get("influxdb_db", "default_db")
measurement_name = config.get("measurement_name", "squid-access_log")
read_from_end = config.get("read_from_end", "True")
regex = config.get("regex", DEFAULT_REGEX)
logfile = config.get("logfile", "/var/log/squid/access.log")
if regex == "default":
regex = DEFAULT_REGEX
# Check InfluxDB Connection
if not check_influxdb_connection(influxdb_url, influxdb_port, influxdb_db):
logger.error(f"Cannot connect to InfluxDB for {log_name}. Skipping.")
return
# Check for log File presence
if not logfile or not os.path.exists(logfile):
logger.error(f"Log file not found for {log_name}: {logfile}")
return
logger.info(f"Loading log file for {log_name}: {logfile}")
try:
with open(logfile) as f:
write_url = f"{influxdb_url}:{influxdb_port}/write?db={influxdb_db}"
if read_from_end == "True":
logger.info(f"{log_name}: Started Reading from last line.")
f.seek(0, os.SEEK_END)
else:
logger.info(f"{log_name}: Started Reading from the first line.")
f.seek(0, os.SEEK_SET)
# Get total lines for progress tracking
total_lines = get_file_lines_count(logfile)
processed_lines = 0
matched_lines = 0
logger.info(f"{log_name}: Processing {total_lines} existing log entries...")
# Read first line to get start time
first_line = f.readline().strip()
first_match = re.match(regex, first_line)
if first_match:
start_time = convert_timestamp(first_match.groupdict()['timestamp'])
logger.info(f"{log_name}: Starting processing from {start_time}")
processed_lines += 1
if process_line(first_line, regex, write_url, measurement_name):
matched_lines += 1
f.seek(0) # Reset to beginning for full processing
for existing_line in f:
processed_lines += 1
if process_line(existing_line, regex, write_url, measurement_name):
matched_lines += 1
# Log progress every 1000 lines or at specific percentages
if processed_lines % 1000 == 0 or processed_lines == total_lines:
progress = (processed_lines / total_lines) * 100
remaining = total_lines - processed_lines
# Get current timestamp from the last processed line
current_match = re.match(regex, existing_line.strip())
if current_match:
current_time = convert_timestamp(current_match.groupdict()['timestamp'])
logger.info(
f"{log_name}: Processed {processed_lines}/{total_lines} lines "
f"({progress:.1f}%). Remaining: {remaining} lines. "
f"Matched: {matched_lines} lines. "
f"Current timestamp: {current_time}"
)
logger.info(
f"{log_name}: Finished processing existing log entries. "
f"Total processed: {processed_lines}, Matched: {matched_lines}"
)
# Set up inotify watch for new changes
i = inotify.adapters.Inotify()
i.add_watch(logfile)
logger.info(f"{log_name}: Watching for new log entries at: {write_url}")
# Watch for new changes
for event in i.event_gen(yield_nones=False):
(_, type_names, path, filename) = event
if 'IN_MODIFY' in type_names:
line = f.readline()
if not line:
continue
process_line(line, regex, write_url, measurement_name)
except Exception as e:
logger.error(f"{log_name}: Error processing log: {e}")
def main():
"""Main function to handle multiple log processing"""
try:
with open(CONFIG_FILE) as f:
conf = json.load(f)
except (FileNotFoundError, json.JSONDecodeError) as e:
logger.error(f"Error loading configuration: {e}")
return
# Start a thread for each log configuration
threads = []
for log_name, log_config in conf.items():
thread = threading.Thread(
target=process_log,
args=(log_name, log_config),
name=f"LogProcessor-{log_name}"
)
threads.append(thread)
thread.start()
logger.info(f"Started processing thread for {log_name}")
# Wait for all threads to complete
for thread in threads:
thread.join()
if __name__ == "__main__":
main()