Skip to content

Commit

Permalink
Merge pull request #5 from danielmitterdorfer/master
Browse files Browse the repository at this point in the history
Improve bulk indexing performance
  • Loading branch information
cdahlqvist committed Nov 29, 2017
2 parents b0bc233 + aa9c27f commit dbb2aa7
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 91 deletions.
Binary file modified eventdata/parameter_sources/data/agents.json.gz
Binary file not shown.
Binary file modified eventdata/parameter_sources/data/agents_name_lookup.json.gz
Binary file not shown.
Binary file not shown.
Binary file modified eventdata/parameter_sources/data/requests.json.gz
Binary file not shown.
7 changes: 4 additions & 3 deletions eventdata/parameter_sources/elasticlogs_bulk_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

logger = logging.getLogger("track.elasticlogs")


class ElasticlogsBulkSource:
"""
Generates a bulk indexing request for elasticlogs data.
Expand Down Expand Up @@ -80,12 +81,12 @@ def params(self):
bulk_array = []
for x in range(0, self._bulk_size):
evt, idx, typ = self._randomevent.generate_event()
bulk_array.append({'index': {'_index': idx, '_type': typ}})
bulk_array.append('{"index": {"_index": "%s", "_type": "%s"}}"' % (idx, typ))
bulk_array.append(evt)

response = { "body": bulk_array, "action_metadata_present": True, "bulk-size": self._bulk_size }
response = { "body": "\n".join(bulk_array), "action_metadata_present": True, "bulk-size": self._bulk_size }

if "pipeline" in self._params.keys():
response["pipeline"] = params["pipeline"]
response["pipeline"] = self._params["pipeline"]

return response
96 changes: 53 additions & 43 deletions eventdata/parameter_sources/randomevent.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import json
import random
import datetime
import calendar
import gzip
import re
import os
Expand All @@ -10,98 +8,101 @@

cwd = os.path.dirname(__file__)


class Agent:
def __init__(self):
self._agents = WeightedArray('%s/data/agents.json.gz' % cwd)

with gzip.open('%s/data/agents_name_lookup.json.gz' % cwd, 'rt') as data_file:
with gzip.open('%s/data/agents_name_lookup.json.gz' % cwd, 'rt') as data_file:
self._agents_name_lookup = json.load(data_file)

with gzip.open('%s/data/agents_os_lookup.json.gz' % cwd, 'rt') as data_file:
with gzip.open('%s/data/agents_os_lookup.json.gz' % cwd, 'rt') as data_file:
self._agents_os_lookup = json.load(data_file)

with gzip.open('%s/data/agents_os_name_lookup.json.gz' % cwd, 'rt') as data_file:
with gzip.open('%s/data/agents_os_name_lookup.json.gz' % cwd, 'rt') as data_file:
self._agents_os_name_lookup = json.load(data_file)

def add_fields(self, event):
agent = self._agents.get_random()

event['agent'] = "\"{}\"".format(agent[0])
event['useragent'] = {}
event['useragent']['os'] = self._agents_os_lookup[agent[1]]
event['useragent']['os_name'] = self._agents_os_name_lookup[agent[2]]
event['useragent']['name'] = self._agents_name_lookup[agent[3]]
event['agent'] = agent[0]
event['useragent_os'] = self._agents_os_lookup[agent[1]]
event['useragent_os_name'] = self._agents_os_name_lookup[agent[2]]
event['useragent_name'] = self._agents_name_lookup[agent[3]]


class ClientIp:
def __init__(self):
self._rare_clientip_probability = 0.269736965199
self._clientips = WeightedArray('%s/data/clientips.json.gz' % cwd)
self._rare_clientips = WeightedArray('%s/data/rare_clientips.json.gz' % cwd)

with gzip.open('%s/data/clientips_country_lookup.json.gz' % cwd, 'rt') as data_file:
with gzip.open('%s/data/clientips_country_lookup.json.gz' % cwd, 'rt') as data_file:
self._clientips_country_lookup = json.load(data_file)

with gzip.open('%s/data/clientips_location_lookup.json.gz' % cwd, 'rt') as data_file:
with gzip.open('%s/data/clientips_location_lookup.json.gz' % cwd, 'rt') as data_file:
self._clientips_location_lookup = json.load(data_file)

def add_fields(self, event):
p = random.random()
event['geoip'] = {}

if p < self._rare_clientip_probability:
data = self._rare_clientips.get_random()
event['clientip'] = self.__fill_out_ip_prefix(data[0])
else:
data = self._clientips.get_random()
event['clientip'] = data[0]

event['geoip']['country_name'] = self._clientips_country_lookup[data[1]]
event['geoip']['location'] = self._clientips_location_lookup[data[2]]
event['country_name'] = self._clientips_country_lookup[data[1]]
event['location'] = self._clientips_location_lookup[data[2]]

def __fill_out_ip_prefix(self, ip_prefix):
rnd1 = random.random()
v1 = rnd1 * ( 1 - rnd1) * 255 * 4
v1 = rnd1 * (1 - rnd1) * 255 * 4
k1 = (int)(v1)
rnd2 = random.random()
v2 = rnd2 * ( 1 - rnd2) * 255 * 4
v2 = rnd2 * (1 - rnd2) * 255 * 4
k2 = (int)(v2)

return "{}.{}.{}".format(ip_prefix, k1, k2)


class Referrer:
def __init__(self):
self._referrers = WeightedArray('%s/data/referrers.json.gz' % cwd)

with gzip.open('%s/data/referrers_url_base_lookup.json.gz' % cwd, 'rt') as data_file:
with gzip.open('%s/data/referrers_url_base_lookup.json.gz' % cwd, 'rt') as data_file:
self._referrers_url_base_lookup = json.load(data_file)

def add_fields(self, event):
data = self._referrers.get_random()
event['referrer'] = "%s%s" % (self._referrers_url_base_lookup[data[0]], data[1])

event['referrer'] = "\"{}{}\"".format(self._referrers_url_base_lookup[data[0]], data[1])

class Request:
def __init__(self):
self._requests = WeightedArray('%s/data/requests.json.gz' % cwd)

with gzip.open('%s/data/requests_url_base_lookup.json.gz' % cwd, 'rt') as data_file:
with gzip.open('%s/data/requests_url_base_lookup.json.gz' % cwd, 'rt') as data_file:
self._requests_url_base_lookup = json.load(data_file)

def add_fields(self, event):
data = self._requests.get_random()

event['request'] = "{}{}".format(self._requests_url_base_lookup[data[0]], data[1])
event['bytes'] = data[2]
event['verb'] = data[3]
event['response'] = data[4]
event['httpversion'] = data[5]


class RandomEvent:
def __init__(self, params):
self._agent = Agent()
self._clientip = ClientIp()
self._referrer = Referrer()
self._request = Request()
# We will reuse the event dictionary. This assumes that each field will be present (and thus overwritten) in each event.
# This reduces object churn and improves peak indexing throughput.
self._event = {}

self._index = 'elasticlogs'
self._index_pattern = False
Expand All @@ -116,13 +117,13 @@ def __init__(self, params):
self._index_pattern = True

self._type = 'logs'
if 'type' in params.keys():
if 'type' in params.keys():
self._type = params['type']

if 'starting_point' in params.keys():
sp = params['starting_point']
else:
sp ="now"
sp = "now"

if 'end_point' in params.keys():
ep = params['end_point']
Expand All @@ -140,23 +141,37 @@ def __init__(self, params):
self._delete_fields.append(d.split('.'))

def generate_event(self):
event = {}
timestruct = self._timestamp_generator.generate_timestamp_struct()
index_name = self.__generate_index_pattern(timestruct)

event = self._event
event["@timestamp"] = timestruct["iso"]

self._agent.add_fields(event)
self._clientip.add_fields(event)
self._referrer.add_fields(event)
self._request.add_fields(event)

timestruct = self._timestamp_generator.generate_timestamp_struct()
event['@timestamp'] = timestruct['iso']

if 'message' not in self._delete_fields:
event['message'] = self.__generate_message_field(event)

index_name = self.__generate_index_pattern(timestruct)

self.__delete_requested_fields(event)

return event, index_name, self._type
# note the leading comma!
message = ',"message":"%s"' % self.__generate_message_field(event)
else:
message = ''

line = '{"@timestamp": "%s", ' \
'"agent":"%s","useragent":{"os":"%s","os_name":"%s","name":"%s"},' \
'"clientip":"%s","geoip":{"country_name":"%s","location":%s},' \
'"referrer":"%s",' \
'"request": "%s","bytes":%s,"verb":"%s","response":%s,"httpversion":"%s"' \
'%s}' % \
(event["@timestamp"],
event["agent"], event["useragent_os"], event["useragent_os_name"], event["useragent_name"],
event["clientip"], event["country_name"], event["location"],
event["referrer"],
event["request"], event["bytes"], event["verb"], event["response"], event["httpversion"],
message)

return line, index_name, self._type

def __generate_index_pattern(self, timestruct):
if self._index_pattern:
Expand All @@ -165,11 +180,6 @@ def __generate_index_pattern(self, timestruct):
return self._index

def __generate_message_field(self, event):
return '{} - - [{}] "{} {} HTTP/{}" {} {} "-" {} {}'.format(event['clientip'], event['@timestamp'], event['verb'], event['request'], event['httpversion'], event['response'], event['bytes'], event['referrer'], event['agent'])

def __delete_requested_fields(self, event):
for d in self._delete_fields:
if len(d) == 1 and d[0] in event.keys():
del event[d[0]]
elif len(d) == 2 and d[0] in event.keys() and d[1] in event[d[0]].keys():
del event[d[0]][d[1]]
return '{} - - [{}] \\"{} {} HTTP/{}\\" {} {} \\"-\\" {} {}'.format(event['clientip'], event['@timestamp'], event['verb'],
event['request'], event['httpversion'], event['response'],
event['bytes'], event['referrer'], event['agent'])
33 changes: 17 additions & 16 deletions eventdata/parameter_sources/timeutils.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import random
import math
import datetime
import calendar
import re
import random

epoch = datetime.datetime.utcfromtimestamp(0)


class TimeParsingError(Exception):
"""Exception raised for parameter parsing errors.
Expand All @@ -16,15 +14,18 @@ class TimeParsingError(Exception):
def __init__(self, message):
self.message = message


class TimestampStructGenerator:
def __init__(self, starting_point, end_point=None, acceleration_factor=1.0):
self._start_dt = None
self._starting_point = self.__parse_point_def(starting_point)
if(end_point == None):
if end_point is None:
self._end_point = None
else:
self._end_point = self.__parse_point_def(end_point)
self._acceleration_factor = acceleration_factor
# reuse to reduce object churn
self._ts = {}

@classmethod
def StartingPoint(cls, starting_point, acceleration_factor=1.0):
Expand All @@ -35,7 +36,7 @@ def Interval(cls, starting_point, end_point):
return cls(starting_point, end_point)

def generate_timestamp_struct(self):
if self._end_point == None:
if self._end_point is None:
if self._starting_point['type'] == 'relative':
dt = datetime.datetime.utcnow() + self._starting_point['offset']
else:
Expand All @@ -61,18 +62,18 @@ def generate_timestamp_struct(self):
return self.__generate_timestamp_struct_from_datetime(dt)

def __generate_timestamp_struct_from_datetime(self, dt):
ts = {}
ts['iso'] = "{}Z".format(dt.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3])
ts['yyyy'] = ts['iso'][:4]
ts['yy'] = ts['iso'][2:4]
ts['mm'] = ts['iso'][5:7]
ts['dd'] = ts['iso'][8:10]
ts['hh'] = ts['iso'][11:13]

return ts
# string formatting is about 4 times faster than strftime.
iso = "%04d-%02d-%02dT%02d:%02d:%02d.%03dZ" % (dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second, dt.microsecond)
self._ts['iso'] = iso
self._ts['yyyy'] = iso[:4]
self._ts['yy'] = iso[2:4]
self._ts['mm'] = iso[5:7]
self._ts['dd'] = iso[8:10]
self._ts['hh'] = iso[11:13]
return self._ts

def __set_start_dt_if_not_set(self):
if self._start_dt == None:
if self._start_dt is None:
self._start_dt = datetime.datetime.utcnow()

def __parse_point_def(self, point):
Expand All @@ -82,7 +83,7 @@ def __parse_point_def(self, point):
match = re.match("^now([+-]\d+)([hmd])$", point)
if match:
int_offset = int(match.group(1))

if match.group(2) == "m":
return { 'type': "relative", 'offset': datetime.timedelta(minutes=int_offset)}

Expand Down
Loading

0 comments on commit dbb2aa7

Please sign in to comment.